This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8124f974232 HIVE-28256: Iceberg: Major QB Compaction on partition 
level with evolution (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Sourabh 
Badhya)
8124f974232 is described below

commit 8124f974232d88e40b2b6dc4833f42bd698d74ab
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Wed Jun 26 03:39:23 2024 -0400

    HIVE-28256: Iceberg: Major QB Compaction on partition level with evolution 
(Dmitriy Fingerman, reviewed by Denys Kuzmenko, Sourabh Badhya)
    
    Closes #5248
---
 .../mr/hive/HiveIcebergOutputCommitter.java        |  86 ++-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 119 ++--
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   | 145 +++++
 .../hive/compaction/IcebergCompactionService.java  |   2 +
 .../compaction/IcebergMajorQueryCompactor.java     |  30 +-
 ...or_compaction_single_partition_with_evolution.q |  96 +++
 ...r_compaction_single_partition_with_evolution2.q |  72 +++
 ...iceberg_major_compaction_single_partition.q.out |   4 +-
 ...ompaction_single_partition_with_evolution.q.out | 650 +++++++++++++++++++++
 ...mpaction_single_partition_with_evolution2.q.out | 445 ++++++++++++++
 .../test/resources/testconfiguration.properties    |   2 +
 ql/src/java/org/apache/hadoop/hive/ql/Context.java |  12 +-
 .../ql/ddl/table/AbstractAlterTableAnalyzer.java   |   5 +
 .../compact/AlterTableCompactOperation.java        |  11 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |   3 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  29 +
 .../hadoop/hive/ql/parse/BaseSemanticAnalyzer.java |   2 +-
 17 files changed, 1605 insertions(+), 108 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index cb61d545b3d..66c34361fba 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -65,6 +65,7 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
@@ -78,6 +79,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.compaction.IcebergCompactionService;
 import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
 import org.apache.iceberg.mr.hive.writer.WriterRegistry;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -498,7 +500,22 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           .map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
           .orElse(RewritePolicy.DEFAULT.name()));
 
-      commitOverwrite(table, branchName, startTime, filesForCommit, 
rewritePolicy);
+      if (rewritePolicy != RewritePolicy.DEFAULT) {
+        Integer partitionSpecId = outputTable.jobContexts.stream()
+            .findAny()
+            .map(x -> 
x.getJobConf().get(IcebergCompactionService.PARTITION_SPEC_ID))
+            .map(Integer::valueOf)
+            .orElse(null);
+
+        String partitionPath = outputTable.jobContexts.stream()
+            .findAny()
+            .map(x -> 
x.getJobConf().get(IcebergCompactionService.PARTITION_PATH))
+            .orElse(null);
+
+        commitCompaction(table, startTime, filesForCommit, rewritePolicy, 
partitionSpecId, partitionPath);
+      } else {
+        commitOverwrite(table, branchName, startTime, filesForCommit);
+      }
     }
   }
 
@@ -574,34 +591,73 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     LOG.debug("Added files {}", results);
   }
 
+  /**
+   * Creates and commits an Iceberg compaction change with the provided data 
files.
+   * Either full table or a selected partition contents is replaced with 
compacted files.
+   *
+   * @param table             The table we are changing
+   * @param startTime         The start time of the commit - used only for 
logging
+   * @param results           The object containing the new files
+   * @param rewritePolicy     The rewrite policy to use for the insert 
overwrite commit
+   * @param partitionSpecId   The table spec_id for partition compaction 
operation
+   * @param partitionPath     The path of the compacted partition
+   */
+  private void commitCompaction(Table table, long startTime, FilesForCommit 
results,
+      RewritePolicy rewritePolicy, Integer partitionSpecId, String 
partitionPath) {
+    if (results.dataFiles().isEmpty()) {
+      LOG.info("Empty compaction commit, took {} ms for table: {}", 
System.currentTimeMillis() - startTime, table);
+      return;
+    }
+    if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
+      // Full table compaction
+      Transaction transaction = table.newTransaction();
+      DeleteFiles delete = transaction.newDelete();
+      delete.deleteFromRowFilter(Expressions.alwaysTrue());
+      delete.commit();
+      ReplacePartitions overwrite = transaction.newReplacePartitions();
+      results.dataFiles().forEach(overwrite::addFile);
+      overwrite.commit();
+      transaction.commitTransaction();
+      LOG.debug("Compacted full table with files {}", results);
+    } else {
+      // Single partition compaction
+      List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, 
partitionSpecId, partitionPath);
+      List<DeleteFile> existingDeleteFiles = 
IcebergTableUtil.getDeleteFiles(table, partitionSpecId, partitionPath);
+
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
+
+      existingDataFiles.forEach(rewriteFiles::deleteFile);
+      existingDeleteFiles.forEach(rewriteFiles::deleteFile);
+      results.dataFiles().forEach(rewriteFiles::addFile);
+
+      rewriteFiles.commit();
+      LOG.debug("Compacted partition {} with files {}", partitionPath, 
results);
+    }
+    LOG.info("Compaction commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
+        table, results.dataFiles().size());
+  }
+
   /**
    * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
    * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
    * then the unpartitioned table is truncated.
    * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
    * then the unpartitioned table remains unchanged.
-   * @param table The table we are changing
-   * @param startTime The start time of the commit - used only for logging
-   * @param results The object containing the new files
-   * @param rewritePolicy The rewrite policy to use for the insert overwrite 
commit
+   *
+   * @param table                   The table we are changing
+   * @param startTime               The start time of the commit - used only 
for logging
+   * @param results                 The object containing the new files
    */
-  private void commitOverwrite(Table table, String branchName, long startTime, 
FilesForCommit results,
-      RewritePolicy rewritePolicy) {
+  private void commitOverwrite(Table table, String branchName, long startTime, 
FilesForCommit results) {
     Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not 
handle deletes with overwrite");
     if (!results.dataFiles().isEmpty()) {
-      Transaction transaction = table.newTransaction();
-      if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
-        DeleteFiles delete = transaction.newDelete();
-        delete.deleteFromRowFilter(Expressions.alwaysTrue());
-        delete.commit();
-      }
-      ReplacePartitions overwrite = transaction.newReplacePartitions();
+      ReplacePartitions overwrite = table.newReplacePartitions();
       results.dataFiles().forEach(overwrite::addFile);
       if (StringUtils.isNotEmpty(branchName)) {
         overwrite.toBranch(HiveUtils.getTableSnapshotRef(branchName));
       }
       overwrite.commit();
-      transaction.commitTransaction();
       LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
           table, results.dataFiles().size());
     } else if (table.spec().isUnpartitioned()) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 9ed58c3f059..6926340485a 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -199,7 +199,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.Pair;
@@ -216,8 +215,6 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
   private static final String TABLE_NAME_SEPARATOR = "..";
   // Column index for partition metadata table
-  private static final int SPEC_IDX = 1;
-  private static final int PART_IDX = 0;
   public static final String COPY_ON_WRITE = "copy-on-write";
   public static final String MERGE_ON_READ = "merge-on-read";
   public static final String STATS = "/stats/snap-";
@@ -383,7 +380,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
 
     List<ExprNodeDesc> subExprNodes = pushedPredicate.getChildren();
     if (subExprNodes.removeIf(nodeDesc -> nodeDesc.getCols() != null &&
-        nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()))) {
+        (nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()) ||
+            
nodeDesc.getCols().contains(VirtualColumn.PARTITION_SPEC_ID.getName())))) {
       if (subExprNodes.size() == 1) {
         pushedPredicate = subExprNodes.get(0);
       } else if (subExprNodes.isEmpty()) {
@@ -1146,8 +1144,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         // If the table is empty we don't have any danger that some data can 
get lost.
         return;
       }
-      if (RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname, 
RewritePolicy.DEFAULT.name())) ==
-          RewritePolicy.ALL_PARTITIONS) {
+      if (RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname)) 
!= RewritePolicy.DEFAULT) {
         // Table rewriting has special logic as part of IOW that handles the 
case when table had a partition evolution
         return;
       }
@@ -1813,7 +1810,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         String[] row =
             fetcher.convert(currSerDe.deserialize(value), 
currSerDe.getObjectInspector())
                 .toString().split("\t");
-        parts.add(HiveTableUtil.getParseData(row[PART_IDX], row[SPEC_IDX], 
mapper, tbl.spec().specId()));
+        parts.add(HiveTableUtil.getParseData(row[IcebergTableUtil.PART_IDX], 
row[IcebergTableUtil.SPEC_IDX],
+            mapper, tbl.spec().specId()));
       }
     }
     Collections.sort(parts);
@@ -1821,8 +1819,16 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   @Override
-  public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Map<String, String> partitionSpec)
-      throws SemanticException {
+  public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Map<String, String> partitionSpec,
+      Context.RewritePolicy policy) throws SemanticException {
+    Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
+    List<PartitionField> partitionFields = (policy == 
Context.RewritePolicy.PARTITION) ?
+        IcebergTableUtil.getPartitionFields(table) : table.spec().fields();
+    validatePartSpecImpl(hmsTable, partitionSpec, partitionFields);
+  }
+
+  private void validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable,
+      Map<String, String> partitionSpec, List<PartitionField> partitionFields) 
throws SemanticException {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
     if (hmsTable.getSnapshotRef() != null && 
hasUndergonePartitionEvolution(table)) {
       // for this case we rewrite the query as delete query, so validations 
would be done as part of delete.
@@ -1834,7 +1840,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     }
 
     Map<String, Types.NestedField> mapOfPartColNamesWithTypes = 
Maps.newHashMap();
-    for (PartitionField partField : table.spec().fields()) {
+    for (PartitionField partField : partitionFields) {
       Types.NestedField field = table.schema().findField(partField.sourceId());
       mapOfPartColNamesWithTypes.put(field.name(), field);
     }
@@ -1877,7 +1883,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       return false;
     }
 
-    Expression finalExp = generateExpressionFromPartitionSpec(table, 
partitionSpec);
+    Expression finalExp = 
IcebergTableUtil.generateExpressionFromPartitionSpec(table, partitionSpec);
     FindFiles.Builder builder = new 
FindFiles.Builder(table).withRecordsMatching(finalExp);
     Set<DataFile> dataFiles = Sets.newHashSet(builder.collect());
     boolean result = true;
@@ -1909,14 +1915,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   @Override
   public Optional<ErrorMsg> isEligibleForCompaction(
       org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> 
partitionSpec) {
-    if (partitionSpec != null) {
-      Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
-      if (hasUndergonePartitionEvolution(icebergTable)) {
-        return Optional.of(ErrorMsg.COMPACTION_PARTITION_EVOLUTION);
-      }
-      if (!isIdentityPartitionTable(table)) {
-        return Optional.of(ErrorMsg.COMPACTION_NON_IDENTITY_PARTITION_SPEC);
-      }
+    if (partitionSpec != null && !isIdentityPartitionTable(table)) {
+      return Optional.of(ErrorMsg.COMPACTION_NON_IDENTITY_PARTITION_SPEC);
     }
     return Optional.empty();
   }
@@ -1925,13 +1925,22 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   public List<Partition> 
getPartitions(org.apache.hadoop.hive.ql.metadata.Table table,
       Map<String, String> partitionSpec) throws SemanticException {
     return getPartitionNames(table, partitionSpec).stream()
-        .map(partName -> new DummyPartition(table, partName, 
partitionSpec)).collect(Collectors.toList());
+        .map(partName -> {
+          Map<String, String> partSpecMap = Maps.newLinkedHashMap();
+          Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
+          return new DummyPartition(table, partName, partSpecMap);
+        }).collect(Collectors.toList());
   }
 
   @Override
   public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
+      Map<String, String> partitionSpec, Context.RewritePolicy policy) throws 
SemanticException {
+    validatePartSpec(table, partitionSpec, policy);
+    return getPartitionImpl(table, partitionSpec);
+  }
+
+  private Partition getPartitionImpl(org.apache.hadoop.hive.ql.metadata.Table 
table,
       Map<String, String> partitionSpec) throws SemanticException {
-    validatePartSpec(table, partitionSpec);
     try {
       String partName = Warehouse.makePartName(partitionSpec, false);
       return new DummyPartition(table, partName, partitionSpec);
@@ -1951,62 +1960,19 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   public List<String> 
getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
       Map<String, String> partitionSpec) throws SemanticException {
     Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
-            .createMetadataTableInstance(icebergTable, 
MetadataTableType.PARTITIONS);
-    Expression expression = generateExpressionFromPartitionSpec(icebergTable, 
partitionSpec);
-    Set<PartitionData> partitionList = Sets.newHashSet();
-    try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
-      fileScanTasks.forEach(task -> {
-        
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(),
 row -> {
-          StructProjection data = row.get(PART_IDX, StructProjection.class);
-          PartitionSpec pSpec = icebergTable.spec();
-          PartitionData partitionData = new 
PartitionData(pSpec.partitionType());
-          for (int index = 0; index < pSpec.fields().size(); index++) {
-            partitionData.set(index, data.get(index, Object.class));
-          }
-          return partitionData;
-        })));
-      });
 
-      List<String> partPathList = partitionList.stream().filter(partitionData 
-> {
-        ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.spec(), 
expression, false);
-        return 
resEval.residualFor(partitionData).isEquivalentTo(Expressions.alwaysTrue());
-      }).map(partitionData -> 
icebergTable.spec().partitionToPath(partitionData)).collect(Collectors.toList());
-
-      return partPathList;
+    try {
+      return IcebergTableUtil
+          .getPartitionInfo(icebergTable, partitionSpec, 
true).entrySet().stream().map(e -> {
+            PartitionData partitionData = e.getKey();
+            int specId = e.getValue();
+            return 
icebergTable.specs().get(specId).partitionToPath(partitionData);
+          }).collect(Collectors.toList());
     } catch (IOException e) {
       throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
     }
   }
 
-  private Expression generateExpressionFromPartitionSpec(Table table, 
Map<String, String> partitionSpec)
-      throws SemanticException {
-    Map<String, PartitionField> partitionFieldMap = 
table.spec().fields().stream()
-        .collect(Collectors.toMap(PartitionField::name, Function.identity()));
-    Expression finalExp = Expressions.alwaysTrue();
-    for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
-      String partColName = entry.getKey();
-      if (partitionFieldMap.containsKey(partColName)) {
-        PartitionField partitionField = partitionFieldMap.get(partColName);
-        Type resultType = 
partitionField.transform().getResultType(table.schema()
-                .findField(partitionField.sourceId()).type());
-        Object value = Conversions.fromPartitionString(resultType, 
entry.getValue());
-        TransformSpec.TransformType transformType = 
TransformSpec.fromString(partitionField.transform().toString());
-        Iterable<?> iterable = () -> 
Collections.singletonList(value).iterator();
-        if (TransformSpec.TransformType.IDENTITY == transformType) {
-          Expression boundPredicate = Expressions.in(partitionField.name(), 
iterable);
-          finalExp = Expressions.and(finalExp, boundPredicate);
-        } else {
-          throw new SemanticException(
-                  String.format("Partition transforms are not supported via 
truncate operation: %s", partColName));
-        }
-      } else {
-        throw new SemanticException(String.format("No partition 
column/transform by the name: %s", partColName));
-      }
-    }
-    return finalExp;
-  }
-
   /**
    * A function to fetch the column information of the underlying column 
defined by the table format.
    * @param hmsTable A Hive table instance
@@ -2089,14 +2055,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   @Override
   public List<FieldSchema> 
getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    Schema schema = icebergTable.schema();
-    List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
-    Map<String, String> colNameToColType = hiveSchema.stream()
-        .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
-    return icebergTable.spec().fields().stream().map(partField ->
-      new FieldSchema(schema.findColumnName(partField.sourceId()),
-      colNameToColType.get(schema.findColumnName(partField.sourceId())),
-      String.format("Transform: %s", 
partField.transform().toString()))).collect(Collectors.toList());
+    return IcebergTableUtil.getPartitionKeys(icebergTable, 
icebergTable.spec().specId());
   }
 
   @Override
@@ -2113,7 +2072,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
       fileScanTasks.forEach(task ->
           
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(),
 row -> {
-            StructProjection data = row.get(PART_IDX, StructProjection.class);
+            StructProjection data = row.get(IcebergTableUtil.PART_IDX, 
StructProjection.class);
             return IcebergTableUtil.toPartitionData(data, 
pSpec.partitionType());
           })).stream()
              .filter(partitionData -> 
resEval.residualFor(partitionData).isEquivalentTo(Expressions.alwaysTrue()))
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 653372d7988..f3d9d98f868 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -19,30 +19,45 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PartitionsTable;
+import org.apache.iceberg.PositionDeletesScanTask;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
@@ -52,14 +67,24 @@ import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IcebergTableUtil {
 
+  public static final int SPEC_IDX = 1;
+  public static final int PART_IDX = 0;
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableUtil.class);
 
   private IcebergTableUtil() {
@@ -345,4 +370,124 @@ public class IcebergTableUtil {
     return data;
   }
 
+  public static PartitionData toPartitionData(StructLike sourceKey, 
Types.StructType sourceKeyType,
+      Types.StructType targetKeyType) {
+    PartitionData data = new PartitionData(targetKeyType);
+    for (int i = 0; i < targetKeyType.fields().size(); i++) {
+
+      int fi = i;
+      String fieldName = targetKeyType.fields().get(fi).name();
+      Object val = sourceKeyType.fields().stream()
+          .filter(f -> f.name().equals(fieldName)).findFirst()
+          .map(sourceKeyElem -> 
sourceKey.get(sourceKeyType.fields().indexOf(sourceKeyElem),
+              targetKeyType.fields().get(fi).type().typeId().javaClass()))
+          .orElseThrow(() -> new RuntimeException(
+              String.format("Error retrieving value of partition field %s", 
fieldName)));
+
+      if (val != null) {
+        data.set(fi, val);
+      } else {
+        throw new RuntimeException(String.format("Partition field's %s value 
is null", fieldName));
+      }
+    }
+    return data;
+  }
+
+  public static List<DataFile> getDataFiles(Table table, int specId,
+      String partitionPath) {
+    CloseableIterable<FileScanTask> fileScanTasks =
+        
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
+    CloseableIterable<FileScanTask> filteredFileScanTasks =
+        CloseableIterable.filter(fileScanTasks, t -> {
+          DataFile file = t.asFileScanTask().file();
+          return file.specId() == specId && table.specs()
+              
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
+        });
+    return 
Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> 
t.file()));
+  }
+
+  public static List<DeleteFile> getDeleteFiles(Table table, int specId, 
String partitionPath) {
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<ScanTask> deletesScanTasks = 
deletesTable.newBatchScan().planFiles();
+    CloseableIterable<ScanTask> filteredDeletesScanTasks =
+        CloseableIterable.filter(deletesScanTasks, t -> {
+          DeleteFile file = ((PositionDeletesScanTask) t).file();
+          return file.specId() == specId && table.specs()
+              
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
+        });
+    return 
Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
+        t -> ((PositionDeletesScanTask) t).file()));
+  }
+
+  public static Expression generateExpressionFromPartitionSpec(Table table, 
Map<String, String> partitionSpec)
+      throws SemanticException {
+    Map<String, PartitionField> partitionFieldMap = 
getPartitionFields(table).stream()
+        .collect(Collectors.toMap(PartitionField::name, Function.identity()));
+    Expression finalExp = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
+      String partColName = entry.getKey();
+      if (partitionFieldMap.containsKey(partColName)) {
+        PartitionField partitionField = partitionFieldMap.get(partColName);
+        Type resultType = 
partitionField.transform().getResultType(table.schema()
+            .findField(partitionField.sourceId()).type());
+        Object value = Conversions.fromPartitionString(resultType, 
entry.getValue());
+        TransformSpec.TransformType transformType = 
TransformSpec.fromString(partitionField.transform().toString());
+        Iterable<?> iterable = () -> 
Collections.singletonList(value).iterator();
+        if (TransformSpec.TransformType.IDENTITY == transformType) {
+          Expression boundPredicate = Expressions.in(partitionField.name(), 
iterable);
+          finalExp = Expressions.and(finalExp, boundPredicate);
+        } else {
+          throw new SemanticException(
+              String.format("Partition transforms are not supported via 
truncate operation: %s", partColName));
+        }
+      } else {
+        throw new SemanticException(String.format("No partition 
column/transform by the name: %s", partColName));
+      }
+    }
+    return finalExp;
+  }
+
+  public static List<FieldSchema> getPartitionKeys(Table table, int specId) {
+    Schema schema = table.specs().get(specId).schema();
+    List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
+    Map<String, String> colNameToColType = hiveSchema.stream()
+        .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
+    return table.specs().get(specId).fields().stream().map(partField ->
+        new FieldSchema(schema.findColumnName(partField.sourceId()),
+            colNameToColType.get(schema.findColumnName(partField.sourceId())),
+            String.format("Transform: %s", 
partField.transform().toString()))).collect(Collectors.toList());
+  }
+
+  public static List<PartitionField> getPartitionFields(Table table) {
+    return table.specs().values().stream().flatMap(spec -> spec.fields()
+        .stream()).distinct().collect(Collectors.toList());
+  }
+
+  public static Map<PartitionData, Integer> getPartitionInfo(Table 
icebergTable, Map<String, String> partSpecMap,
+      boolean allowPartialSpec) throws SemanticException, IOException {
+    Expression expression = 
IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap);
+    PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+        .createMetadataTableInstance(icebergTable, 
MetadataTableType.PARTITIONS);
+
+    Map<PartitionData, Integer> result = Maps.newLinkedHashMap();
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
+      fileScanTasks.forEach(task ->
+          CloseableIterable.filter(
+              CloseableIterable.transform(task.asDataTask().rows(), row -> {
+                StructProjection data = row.get(IcebergTableUtil.PART_IDX, 
StructProjection.class);
+                Integer specId = row.get(IcebergTableUtil.SPEC_IDX, 
Integer.class);
+                return 
Maps.immutableEntry(IcebergTableUtil.toPartitionData(data,
+                    Partitioning.partitionType(icebergTable), 
icebergTable.specs().get(specId).partitionType()),
+                    specId);
+              }), entry -> {
+                ResidualEvaluator resEval = 
ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()),
+                    expression, false);
+                return 
resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) &&
+                    (entry.getKey().size() == partSpecMap.size() || 
allowPartialSpec);
+              }).forEach(entry -> result.put(entry.getKey(), 
entry.getValue())));
+    }
+
+    return result;
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java
index 7251f6965bc..b69714544d8 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java
@@ -29,6 +29,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IcebergCompactionService extends CompactionService {
+  public static final String PARTITION_SPEC_ID = "compaction_part_spec_id";
+  public static final String PARTITION_PATH = "compaction_partition_path";
   private static final String CLASS_NAME = 
IcebergCompactionService.class.getName();
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java
index d55172715f7..6173c804b4b 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -32,15 +33,20 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context.RewritePolicy;
 import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
 import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.mr.hive.IcebergTableUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +74,24 @@ public class IcebergMajorQueryCompactor extends 
QueryCompactor  {
       Map<String, String> partSpecMap = new LinkedHashMap<>();
       Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
 
-      List<FieldSchema> partitionKeys = 
table.getStorageHandler().getPartitionKeys(table);
+      Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
+      Map<PartitionData, Integer> partitionInfo = IcebergTableUtil
+          .getPartitionInfo(icebergTable, partSpecMap, false);
+      Optional<Integer> specId = partitionInfo.values().stream().findFirst();
+
+      if (!specId.isPresent()) {
+        throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
+      }
+
+      if (partitionInfo.size() > 1) {
+        throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
+      }
+
+      HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, 
RewritePolicy.PARTITION.name());
+      conf.set(IcebergCompactionService.PARTITION_SPEC_ID, 
String.valueOf(specId.get()));
+      conf.set(IcebergCompactionService.PARTITION_PATH, new 
Path(partSpec).toString());
+
+      List<FieldSchema> partitionKeys = 
IcebergTableUtil.getPartitionKeys(icebergTable, specId.get());
       List<String> partValues = partitionKeys.stream().map(
           fs -> String.join("=", HiveUtils.unparseIdentifier(fs.getName()),
               
TypeInfoUtils.convertStringToLiteralForSQL(partSpecMap.get(fs.getName()),
@@ -81,11 +104,12 @@ public class IcebergMajorQueryCompactor extends 
QueryCompactor  {
           .filter(col -> !partSpecMap.containsKey(col))
           .collect(Collectors.joining(","));
 
-      compactionQuery = String.format("insert overwrite table %1$s 
partition(%2$s) select %4$s from %1$s where %3$s",
+      compactionQuery = String.format("insert overwrite table %1$s 
partition(%2$s) " +
+              "select %4$s from %1$s where %3$s and %6$s = %5$d",
           compactTableName,
           StringUtils.join(partValues, ","),
           StringUtils.join(partValues, " and "),
-          queryFields);
+          queryFields, specId.get(), 
VirtualColumn.PARTITION_SPEC_ID.getName());
     }
 
     SessionState sessionState = setupQueryCompactionSession(conf, 
context.getCompactionInfo(), tblProperties);
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution.q
new file mode 100644
index 00000000000..c724b249be6
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution.q
@@ -0,0 +1,96 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+
+set hive.llap.io.enabled=true;
+set hive.vectorized.execution.enabled=true;
+set hive.optimize.shared.work.merge.ts.schema=true;
+
+create table ice_orc (
+    first_name string, 
+    last_name string,
+    registration_date date
+ )
+partitioned by (dept_id bigint, 
+                city string)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2');
+
+insert into ice_orc partition(dept_id=1, city='London') VALUES 
+('fn1','ln1','2024-03-11'),
+('fn2','ln2','2024-03-11');
+
+insert into ice_orc partition(dept_id=1, city='London') VALUES 
+('fn3','ln3','2024-03-11'),
+('fn4','ln4','2024-03-11');
+
+insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn5','ln5','2024-02-16'),
+('fn6','ln6','2024-02-16');
+
+insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn7','ln7','2024-02-16'),
+('fn8','ln8','2024-02-16');
+
+alter table ice_orc set partition spec(dept_id, city, registration_date);
+
+insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn9','ln9'),
+('fn10','ln10');
+
+insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn11','ln11'),
+('fn12','ln12');
+
+insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn13','ln13'),
+('fn14','ln14');
+
+insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn15','ln15'),
+('fn16','ln16');
+
+delete from ice_orc where last_name in ('ln1', 'ln3', 'ln5', 'ln7', 'ln9', 
'ln11', 'ln13', 'ln15');
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc PARTITION (dept_id=1, city='London', 
registration_date='2024-03-11') COMPACT 'major' and wait;
+alter table ice_orc PARTITION (dept_id=1, city='London', 
registration_date='2024-03-11') COMPACT 'major' and wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc PARTITION (dept_id=2, city='Paris', 
registration_date='2024-02-16') COMPACT 'major' and wait;
+alter table ice_orc PARTITION (dept_id=2, city='Paris', 
registration_date='2024-02-16') COMPACT 'major' and wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc PARTITION (dept_id=1, city='London') COMPACT 
'major' and wait;
+alter table ice_orc PARTITION (dept_id=1, city='London') COMPACT 'major' and 
wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc PARTITION (dept_id=2, city='Paris') COMPACT 
'major' and wait;
+alter table ice_orc PARTITION (dept_id=2, city='Paris') COMPACT 'major' and 
wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
+show compactions;
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q
new file mode 100644
index 00000000000..fa0de6ae6c4
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q
@@ -0,0 +1,72 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+
+set hive.llap.io.enabled=true;
+set hive.vectorized.execution.enabled=true;
+set hive.optimize.shared.work.merge.ts.schema=true;
+
+create table ice_orc (
+    a string
+ )
+partitioned by (b bigint)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2');
+
+insert into ice_orc partition(b=1) VALUES 
+('a1'),
+('a2'),
+('a3');
+
+insert into ice_orc partition(b=1) VALUES
+('a4'),
+('a5'),
+('a6');
+
+alter table ice_orc set partition spec(a);
+
+insert into ice_orc partition (a='a') VALUES 
+(1),
+(2),
+(3);
+
+insert into ice_orc partition (a='a') VALUES 
+(4),
+(5),
+(6);
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+delete from ice_orc where a in ('a2', 'a4');
+delete from ice_orc where b in (3, 6);
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc partition(a='a') compact 'major' and wait;
+alter table ice_orc partition(a='a') compact 'major' and wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
+
+explain alter table ice_orc partition(b=1) compact 'major' and wait;
+alter table ice_orc partition(b=1) compact 'major' and wait;
+
+select * from ice_orc;
+describe formatted ice_orc;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
index b2aeeac2e50..41dd8dc89fa 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
@@ -303,7 +303,7 @@ Table Parameters:
        bucketing_version       2                   
        current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"city\",\"required\":false,\"type\":\"string\"},{\"id\":5,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"}]}
        current-snapshot-id     #Masked#
-       current-snapshot-summary        
{\"replace-partitions\":\"true\",\"added-data-files\":\"1\",\"deleted-data-files\":\"6\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"2\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"7\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"6\",\"total-delete-files\":\"3\",\"t
 [...]
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"6\",\"removed-position-delete-files\":\"4\",\"removed-delete-files\":\"4\",\"added-records\":\"2\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"4\",\"changed-partition-count\":\"1\",\"total-records\":\"7\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"6\",\"total-delete-files\":\"3\",\"total-position-deletes\":\"3\",\"
 [...]
        current-snapshot-timestamp-ms   #Masked#       
        default-partition-spec  
{\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1002}]}
        format-version          2                   
@@ -413,7 +413,7 @@ Table Parameters:
        bucketing_version       2                   
        current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"city\",\"required\":false,\"type\":\"string\"},{\"id\":5,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"}]}
        current-snapshot-id     #Masked#
-       current-snapshot-summary        
{\"replace-partitions\":\"true\",\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"2\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"t
 [...]
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"5\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"2\",\"deleted-records\":\"5\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"
 [...]
        current-snapshot-timestamp-ms   #Masked#       
        default-partition-spec  
{\"spec-id\":0,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1002}]}
        format-version          2                   
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
new file mode 100644
index 00000000000..806675b676f
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
@@ -0,0 +1,650 @@
+PREHOOK: query: create table ice_orc (
+    first_name string, 
+    last_name string,
+    registration_date date
+ )
+partitioned by (dept_id bigint, 
+                city string)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: create table ice_orc (
+    first_name string, 
+    last_name string,
+    registration_date date
+ )
+partitioned by (dept_id bigint, 
+                city string)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: insert into ice_orc partition(dept_id=1, city='London') VALUES 
+('fn1','ln1','2024-03-11'),
+('fn2','ln2','2024-03-11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@dept_id=1/city=London
+POSTHOOK: query: insert into ice_orc partition(dept_id=1, city='London') 
VALUES 
+('fn1','ln1','2024-03-11'),
+('fn2','ln2','2024-03-11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@dept_id=1/city=London
+PREHOOK: query: insert into ice_orc partition(dept_id=1, city='London') VALUES 
+('fn3','ln3','2024-03-11'),
+('fn4','ln4','2024-03-11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@dept_id=1/city=London
+POSTHOOK: query: insert into ice_orc partition(dept_id=1, city='London') 
VALUES 
+('fn3','ln3','2024-03-11'),
+('fn4','ln4','2024-03-11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@dept_id=1/city=London
+PREHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn5','ln5','2024-02-16'),
+('fn6','ln6','2024-02-16')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+POSTHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn5','ln5','2024-02-16'),
+('fn6','ln6','2024-02-16')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+PREHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn7','ln7','2024-02-16'),
+('fn8','ln8','2024-02-16')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+POSTHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris') VALUES
+('fn7','ln7','2024-02-16'),
+('fn8','ln8','2024-02-16')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+PREHOOK: query: alter table ice_orc set partition spec(dept_id, city, 
registration_date)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: alter table ice_orc set partition spec(dept_id, city, 
registration_date)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn9','ln9'),
+('fn10','ln10')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+POSTHOOK: query: insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn9','ln9'),
+('fn10','ln10')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+PREHOOK: query: insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn11','ln11'),
+('fn12','ln12')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+POSTHOOK: query: insert into ice_orc partition(dept_id=1, city='London', 
registration_date='2024-03-11') VALUES 
+('fn11','ln11'),
+('fn12','ln12')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+PREHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn13','ln13'),
+('fn14','ln14')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+POSTHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn13','ln13'),
+('fn14','ln14')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+PREHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn15','ln15'),
+('fn16','ln16')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+POSTHOOK: query: insert into ice_orc partition(dept_id=2, city='Paris', 
registration_date='2024-02-16') VALUES
+('fn15','ln15'),
+('fn16','ln16')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+PREHOOK: query: delete from ice_orc where last_name in ('ln1', 'ln3', 'ln5', 
'ln7', 'ln9', 'ln11', 'ln13', 'ln15')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: delete from ice_orc where last_name in ('ln1', 'ln3', 'ln5', 
'ln7', 'ln9', 'ln11', 'ln13', 'ln15')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+fn10   ln10    2024-03-11      1       London
+fn12   ln12    2024-03-11      1       London
+fn14   ln14    2024-02-16      2       Paris
+fn16   ln16    2024-02-16      2       Paris
+fn2    ln2     2024-03-11      1       London
+fn4    ln4     2024-03-11      1       London
+fn6    ln6     2024-02-16      2       Paris
+fn8    ln8     2024-02-16      2       Paris
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+registration_date      date                                        
+dept_id                bigint                                      
+city                   string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+dept_id                IDENTITY                 
+city                   IDENTITY                 
+registration_date      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-position-delete-files\":\"4\",\"added-delete-files\":\"4\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"8\",\"changed-partition-count\":\"4\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"4\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\"}
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1002}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                8                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          9                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc PARTITION (dept_id=1, 
city='London', registration_date='2024-03-11') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+POSTHOOK: query: explain alter table ice_orc PARTITION (dept_id=1, 
city='London', registration_date='2024-03-11') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        city London
+        dept_id 1
+        registration_date 2024-03-11
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc PARTITION (dept_id=1, city='London', 
registration_date='2024-03-11') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+POSTHOOK: query: alter table ice_orc PARTITION (dept_id=1, city='London', 
registration_date='2024-03-11') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: 
default@ice_orc@dept_id=1/city=London/registration_date=2024-03-11
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+fn10   ln10    2024-03-11      1       London
+fn12   ln12    2024-03-11      1       London
+fn14   ln14    2024-02-16      2       Paris
+fn16   ln16    2024-02-16      2       Paris
+fn2    ln2     2024-03-11      1       London
+fn4    ln4     2024-03-11      1       London
+fn6    ln6     2024-02-16      2       Paris
+fn8    ln8     2024-02-16      2       Paris
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+registration_date      date                                        
+dept_id                bigint                                      
+city                   string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+dept_id                IDENTITY                 
+city                   IDENTITY                 
+registration_date      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"city\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"registration_date\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"2\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"1\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"7\",\"total-delete-files\":\"3\",\"total-position-deletes\":\"6\",\
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1002}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                7                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          10                  
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc PARTITION (dept_id=2, 
city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+POSTHOOK: query: explain alter table ice_orc PARTITION (dept_id=2, 
city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        city Paris
+        dept_id 2
+        registration_date 2024-02-16
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc PARTITION (dept_id=2, city='Paris', 
registration_date='2024-02-16') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+POSTHOOK: query: alter table ice_orc PARTITION (dept_id=2, city='Paris', 
registration_date='2024-02-16') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: 
default@ice_orc@dept_id=2/city=Paris/registration_date=2024-02-16
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+fn10   ln10    2024-03-11      1       London
+fn12   ln12    2024-03-11      1       London
+fn14   ln14    2024-02-16      2       Paris
+fn16   ln16    2024-02-16      2       Paris
+fn2    ln2     2024-03-11      1       London
+fn4    ln4     2024-03-11      1       London
+fn6    ln6     2024-02-16      2       Paris
+fn8    ln8     2024-02-16      2       Paris
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+registration_date      date                                        
+dept_id                bigint                                      
+city                   string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+dept_id                IDENTITY                 
+city                   IDENTITY                 
+registration_date      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"city\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"registration_date\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"2\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"6\",\"total-delete-files\":\"2\",\"total-position-deletes\":\"4\",\
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1002}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                6                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          11                  
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc PARTITION (dept_id=1, 
city='London') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@dept_id=1/city=London
+POSTHOOK: query: explain alter table ice_orc PARTITION (dept_id=1, 
city='London') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@dept_id=1/city=London
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        city London
+        dept_id 1
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc PARTITION (dept_id=1, city='London') 
COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@dept_id=1/city=London
+POSTHOOK: query: alter table ice_orc PARTITION (dept_id=1, city='London') 
COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@dept_id=1/city=London
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+fn10   ln10    2024-03-11      1       London
+fn12   ln12    2024-03-11      1       London
+fn14   ln14    2024-02-16      2       Paris
+fn16   ln16    2024-02-16      2       Paris
+fn2    ln2     2024-03-11      1       London
+fn4    ln4     2024-03-11      1       London
+fn6    ln6     2024-02-16      2       Paris
+fn8    ln8     2024-02-16      2       Paris
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+registration_date      date                                        
+dept_id                bigint                                      
+city                   string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+dept_id                IDENTITY                 
+city                   IDENTITY                 
+registration_date      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"city\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"registration_date\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"2\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"2\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"5\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"2\",\
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1002}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                5                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          12                  
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc PARTITION (dept_id=2, 
city='Paris') COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+POSTHOOK: query: explain alter table ice_orc PARTITION (dept_id=2, 
city='Paris') COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        city Paris
+        dept_id 2
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc PARTITION (dept_id=2, city='Paris') 
COMPACT 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+POSTHOOK: query: alter table ice_orc PARTITION (dept_id=2, city='Paris') 
COMPACT 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@dept_id=2/city=Paris
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+fn10   ln10    2024-03-11      1       London
+fn12   ln12    2024-03-11      1       London
+fn14   ln14    2024-02-16      2       Paris
+fn16   ln16    2024-02-16      2       Paris
+fn2    ln2     2024-03-11      1       London
+fn4    ln4     2024-03-11      1       London
+fn6    ln6     2024-02-16      2       Paris
+fn8    ln8     2024-02-16      2       Paris
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+registration_date      date                                        
+dept_id                bigint                                      
+city                   string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+dept_id                IDENTITY                 
+city                   IDENTITY                 
+registration_date      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"city\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"registration_date\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"registration_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"city\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"2\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"2\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000},{\"name\":\"city\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1001},{\"name\":\"registration_date\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1002}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                4                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          13                  
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: show compactions
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId   Database        Table   Partition       Type    State   Worker 
host     Worker  Enqueue Time    Start Time      Duration(ms)    HadoopJobId    
 Error message   Initiator host  Initiator       Pool name       TxnId   Next 
TxnId      Commit Time     Highest WriteId
+#Masked#       default ice_orc 
dept_id=1/city=London/registration_date=2024-03-11      MAJOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default ice_orc 
dept_id=2/city=Paris/registration_date=2024-02-16       MAJOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default ice_orc dept_id=1/city=London   MAJOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default ice_orc dept_id=2/city=Paris    MAJOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
new file mode 100644
index 00000000000..d8bc84569fb
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
@@ -0,0 +1,445 @@
+PREHOOK: query: create table ice_orc (
+    a string
+ )
+partitioned by (b bigint)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: create table ice_orc (
+    a string
+ )
+partitioned by (b bigint)
+stored by iceberg stored as orc 
+tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: insert into ice_orc partition(b=1) VALUES 
+('a1'),
+('a2'),
+('a3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@b=1
+POSTHOOK: query: insert into ice_orc partition(b=1) VALUES 
+('a1'),
+('a2'),
+('a3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@b=1
+PREHOOK: query: insert into ice_orc partition(b=1) VALUES
+('a4'),
+('a5'),
+('a6')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@b=1
+POSTHOOK: query: insert into ice_orc partition(b=1) VALUES
+('a4'),
+('a5'),
+('a6')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@b=1
+PREHOOK: query: alter table ice_orc set partition spec(a)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: alter table ice_orc set partition spec(a)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: insert into ice_orc partition (a='a') VALUES 
+(1),
+(2),
+(3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@a=a
+POSTHOOK: query: insert into ice_orc partition (a='a') VALUES 
+(1),
+(2),
+(3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@a=a
+PREHOOK: query: insert into ice_orc partition (a='a') VALUES 
+(4),
+(5),
+(6)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc@a=a
+POSTHOOK: query: insert into ice_orc partition (a='a') VALUES 
+(4),
+(5),
+(6)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc@a=a
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+a      1
+a      2
+a      3
+a      4
+a      5
+a      6
+a1     1
+a2     1
+a3     1
+a4     1
+a5     1
+a6     1
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+a                      string                                      
+b                      bigint                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+a                      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"a\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"b\",\"required\":false,\"type\":\"long\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"added-records\":\"3\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\"}
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"a\",\"transform\":\"identity\",\"source-id\":1,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                4                   
+       numRows                 12                  
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          4                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: delete from ice_orc where a in ('a2', 'a4')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: delete from ice_orc where a in ('a2', 'a4')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: delete from ice_orc where b in (3, 6)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: delete from ice_orc where b in (3, 6)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+a      1
+a      2
+a      4
+a      5
+a1     1
+a3     1
+a5     1
+a6     1
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+a                      string                                      
+b                      bigint                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+a                      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"a\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"b\",\"required\":false,\"type\":\"long\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-position-delete-files\":\"1\",\"added-delete-files\":\"1\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"2\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"2\",\"total-position-deletes\":\"4\",\"total-equality-deletes\":\"0\"}
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"a\",\"transform\":\"identity\",\"source-id\":1,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                4                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          6                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc partition(a='a') compact 'major' 
and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@a=a
+POSTHOOK: query: explain alter table ice_orc partition(a='a') compact 'major' 
and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@a=a
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        a a
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc partition(a='a') compact 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@a=a
+POSTHOOK: query: alter table ice_orc partition(a='a') compact 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@a=a
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+a      1
+a      2
+a      4
+a      5
+a1     1
+a3     1
+a5     1
+a6     1
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+a                      string                                      
+b                      bigint                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+a                      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"a\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"b\",\"required\":false,\"type\":\"long\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"4\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"1\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"3\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"2\",\
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"a\",\"transform\":\"identity\",\"source-id\":1,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                3                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          7                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: explain alter table ice_orc partition(b=1) compact 'major' and 
wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@b=1
+POSTHOOK: query: explain alter table ice_orc partition(b=1) compact 'major' 
and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@b=1
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.ice_orc
+      numberOfBuckets: 0
+      partition spec:
+        b 1
+      table name: default.ice_orc
+      blocking: true
+
+PREHOOK: query: alter table ice_orc partition(b=1) compact 'major' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc@b=1
+POSTHOOK: query: alter table ice_orc partition(b=1) compact 'major' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc@b=1
+PREHOOK: query: select * from ice_orc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_orc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+#### A masked pattern was here ####
+a      1
+a      2
+a      4
+a      5
+a1     1
+a3     1
+a5     1
+a6     1
+PREHOOK: query: describe formatted ice_orc
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@ice_orc
+POSTHOOK: query: describe formatted ice_orc
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@ice_orc
+# col_name             data_type               comment             
+a                      string                                      
+b                      bigint                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+a                      IDENTITY                 
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"a\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"b\",\"required\":false,\"type\":\"long\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"4\",\"deleted-data-files\":\"2\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"4\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"2\",\"changed-partition-count\":\"5\",\"total-records\":\"8\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"5\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"
 [...]
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"a\",\"transform\":\"identity\",\"source-id\":1,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                5                   
+       numRows                 8                   
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          8                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 8e91c7a73a6..572de056f04 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -427,6 +427,8 @@ iceberg.llap.query.compactor.files=\
   iceberg_major_compaction_query_metadata.q,\
   iceberg_major_compaction_schema_evolution.q,\
   iceberg_major_compaction_single_partition.q,\
+  iceberg_major_compaction_single_partition_with_evolution.q,\
+  iceberg_major_compaction_single_partition_with_evolution2.q,\
   iceberg_major_compaction_unpartitioned.q,\
   iceberg_optimize_table_unpartitioned.q
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index fd0638737df..8634a55cc3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -33,7 +33,6 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Preconditions;
 import org.antlr.runtime.TokenRewriteStream;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.tuple.Pair;
@@ -256,10 +255,13 @@ public class Context {
   public enum RewritePolicy {
 
     DEFAULT,
-    ALL_PARTITIONS;
+    ALL_PARTITIONS,
+    PARTITION;
 
     public static RewritePolicy fromString(String rewritePolicy) {
-      Preconditions.checkArgument(null != rewritePolicy, "Invalid rewrite 
policy: null");
+      if (rewritePolicy == null) {
+        return DEFAULT;
+      }
 
       try {
         return valueOf(rewritePolicy.toUpperCase(Locale.ENGLISH));
@@ -267,6 +269,10 @@ public class Context {
         throw new IllegalArgumentException(String.format("Invalid rewrite 
policy: %s", rewritePolicy), var2);
       }
     }
+
+    public static RewritePolicy get(HiveConf conf) {
+      return fromString(conf.get(HiveConf.ConfVars.REWRITE_POLICY.varname));
+    }
   }
   private String getMatchedText(ASTNode n) {
     return getTokenRewriteStream().toString(n.getTokenStartIndex(), 
n.getTokenStopIndex() + 1).trim();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableAnalyzer.java
index 17b132ca130..fbdf1f21510 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableAnalyzer.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.ddl.table;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -55,6 +57,9 @@ public abstract class AbstractAlterTableAnalyzer extends 
AbstractBaseAlterTableA
       if (command.getType() == HiveParser.TOK_ALTERTABLE_RENAMEPART) {
         partitionSpec = getPartSpec(partitionSpecNode);
       } else {
+        if (command.getType() == HiveParser.TOK_ALTERTABLE_COMPACT) {
+          HiveConf.setVar(conf, HiveConf.ConfVars.REWRITE_POLICY, 
Context.RewritePolicy.PARTITION.name());
+        }
         partitionSpec = getValidatedPartSpec(getTable(tableName), 
partitionSpecNode, conf, false);
       }
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index c2daa1200d9..ee67cc9a4e4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
 
@@ -141,11 +142,15 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
     } else {
       Map<String, String> partitionSpec = desc.getPartitionSpec();
       partitions = context.getDb().getPartitions(table, partitionSpec);
-      if (partitions.size() > 1) {
-        throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
-      } else if (partitions.isEmpty()) {
+      if (partitions.isEmpty()) {
         throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
       }
+      // This validates that the partition spec given in the compaction 
command matches exactly one partition 
+      // in the table, not a partial partition spec.
+      partitions = partitions.stream().filter(part -> part.getSpec().size() == 
partitionSpec.size()).collect(Collectors.toList());
+      if (partitions.size() != 1) {
+        throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
+      } 
     }
     return partitions;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index d3c27fb10be..e688e3f5fdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -123,6 +123,7 @@ import 
org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.metastore.HiveMetaException;
@@ -3664,7 +3665,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
   public Partition getPartition(Table tbl, Map<String, String> partSpec) 
throws HiveException {
     if (tbl.getStorageHandler() != null && 
tbl.getStorageHandler().alwaysUnpartitioned()) {
-      return tbl.getStorageHandler().getPartition(tbl, partSpec);
+      return tbl.getStorageHandler().getPartition(tbl, partSpec, 
Context.RewritePolicy.get(conf));
     } else {
       return getPartition(tbl, partSpec, false);
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b76f5f9ecd2..3f4278ace87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -741,8 +741,24 @@ public interface HiveStorageHandler extends Configurable {
     throw new UnsupportedOperationException("Storage handler does not support 
show partitions command");
   }
 
+  /**
+   * Validates that the provided partitionSpec is a valid according to the 
current table partitioning.
+   * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table 
metadata stored in Hive Metastore
+   * @param partitionSpec Map of Strings {@link java.util.Map} partition 
specification
+   */
   default void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Map<String, String> partitionSpec)
       throws SemanticException {
+    validatePartSpec(hmsTable, partitionSpec, Context.RewritePolicy.DEFAULT);
+  }
+
+  /**
+   * Validates that the provided partitionSpec is a valid according to the 
current table partitioning.
+   * @param hmsTable {@link org.apache.hadoop.hive.ql.metadata.Table} table 
metadata stored in Hive Metastore
+   * @param partitionSpec Map of Strings {@link java.util.Map} partition 
specification
+   * @param policy {@link org.apache.hadoop.hive.ql.Context.RewritePolicy} 
compaction rewrite policy
+   */
+  default void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Map<String, String> partitionSpec,
+      Context.RewritePolicy policy) throws SemanticException {
     throw new UnsupportedOperationException("Storage handler does not support 
validation of partition values");
   }
 
@@ -798,6 +814,19 @@ public interface HiveStorageHandler extends Configurable {
    */
   default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table 
table, Map<String, String> partitionSpec)
       throws SemanticException {
+    return getPartition(table, partitionSpec, Context.RewritePolicy.DEFAULT);
+  }
+
+  /**
+   * Returns partition based on table and partition specification.
+   * @param table {@link org.apache.hadoop.hive.ql.metadata.Table} table 
metadata stored in Hive Metastore
+   * @param partitionSpec Map of Strings {@link java.util.Map} partition 
specification
+   * @param policy {@link org.apache.hadoop.hive.ql.Context.RewritePolicy} 
compaction rewrite policy
+   * @return Partition {@link org.apache.hadoop.hive.ql.metadata.Partition}
+   * @throws SemanticException {@link 
org.apache.hadoop.hive.ql.parse.SemanticException}
+   */
+  default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table 
table, Map<String, String> partitionSpec,
+      Context.RewritePolicy policy) throws SemanticException {
     throw new UnsupportedOperationException("Storage handler does not support 
getting partition for a table.");
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index f767e491eeb..fc87e9d736c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -1707,7 +1707,7 @@ public abstract class BaseSemanticAnalyzer {
   public static void validatePartSpec(Table tbl, Map<String, String> partSpec,
       ASTNode astNode, HiveConf conf, boolean shouldBeFull) throws 
SemanticException {
     if (tbl.getStorageHandler() != null && 
tbl.getStorageHandler().alwaysUnpartitioned()) {
-      tbl.getStorageHandler().validatePartSpec(tbl, partSpec);
+      tbl.getStorageHandler().validatePartSpec(tbl, partSpec, 
Context.RewritePolicy.get(conf));
     } else {
       tbl.validatePartColumnNames(partSpec, shouldBeFull);
       validatePartColumnType(tbl, partSpec, astNode, conf);

Reply via email to