SourabhBadhya commented on code in PR #5248:
URL: https://github.com/apache/hive/pull/5248#discussion_r1621801231


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java:
##########
@@ -68,7 +84,38 @@ public boolean run(CompactorContext context) throws 
IOException, HiveException,
       Map<String, String> partSpecMap = new LinkedHashMap<>();
       Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
 
-      List<FieldSchema> partitionKeys = 
table.getStorageHandler().getPartitionKeys(table);
+      Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
+      Expression expression = 
IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap);
+      PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+          .createMetadataTableInstance(icebergTable, 
MetadataTableType.PARTITIONS);
+      List<Pair<PartitionData, Integer>> partitionList = Lists.newArrayList();
+      try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
+        fileScanTasks.forEach(task ->
+            
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(),
 row -> {
+              StructProjection data = row.get(IcebergTableUtil.PART_IDX, 
StructProjection.class);
+              Integer specId = row.get(IcebergTableUtil.SPEC_IDX, 
Integer.class);
+              return Pair.of(IcebergTableUtil
+                  .toPartitionData(data, 
Partitioning.partitionType(icebergTable),
+                      icebergTable.specs().get(specId).partitionType()), 
specId);
+            })).stream()
+                .filter(pair -> {
+                  ResidualEvaluator resEval = 
ResidualEvaluator.of(icebergTable.specs().get(pair.second()),
+                      expression, false);
+                  return 
resEval.residualFor(pair.first()).isEquivalentTo(Expressions.alwaysTrue()) &&
+                      pair.first().size() == partSpecMap.size();
+                })
+                .collect(Collectors.toSet())));
+      }
+
+      if (partitionList.isEmpty()) {
+        throw new HiveException("Invalid partition spec, no corresponding 
spec_id found");
+      }
+
+      int specId = partitionList.get(0).second();

Review Comment:
   Should we add a clause that this list should have only one partition in it. 
If not, throw exception?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorContext.java:
##########
@@ -30,6 +30,9 @@
  */
 public class CompactorContext {
 
+  public static final String COMPACTION_PART_SPEC_ID = 
"compaction_part_spec_id";
+  public static final String COMPACTION_PARTITION_PATH = 
"compaction_partition_path";
+

Review Comment:
   I think its better to move such constants to Iceberg specific classes since 
I see this being used only in Iceberg right now.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -558,33 +577,71 @@ private void commitWrite(Table table, String branchName, 
Long snapshotId, long s
   }
 
   /**
-   * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
-   * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
-   * then the unpartitioned table is truncated.
-   * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
-   * then the unpartitioned table remains unchanged.
-   * @param table The table we are changing
-   * @param startTime The start time of the commit - used only for logging
-   * @param results The object containing the new files
-   * @param rewritePolicy The rewrite policy to use for the insert overwrite 
commit
+   * Creates and commits an Iceberg compaction change with the provided data 
files.
+   * Either full table or a selected partition contents is replaced with 
compacted files.
+   *
+   * @param table                   The table we are changing
+   * @param startTime               The start time of the commit - used only 
for logging
+   * @param results                 The object containing the new files
+   * @param rewritePolicy           The rewrite policy to use for the insert 
overwrite commit
+   * @param compactionPartSpecId        The table spec_id for partition 
compaction operation
+   * @param compactionPartitionPath The path of the compacted partition
    */
-  private void commitOverwrite(Table table, String branchName, long startTime, 
FilesForCommit results,
-      RewritePolicy rewritePolicy) {
+  private void commitCompaction(Table table, long startTime, FilesForCommit 
results,
+      RewritePolicy rewritePolicy, Integer compactionPartSpecId, String 
compactionPartitionPath) {
     Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not 
handle deletes with overwrite");
     if (!results.dataFiles().isEmpty()) {
-      Transaction transaction = table.newTransaction();
       if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
+        Transaction transaction = table.newTransaction();
         DeleteFiles delete = transaction.newDelete();
         delete.deleteFromRowFilter(Expressions.alwaysTrue());
         delete.commit();
+        ReplacePartitions overwrite = transaction.newReplacePartitions();
+        results.dataFiles().forEach(overwrite::addFile);
+        overwrite.commit();
+        transaction.commitTransaction();
+      } else {
+        Pair<List<DataFile>, List<DeleteFile>> existingFiles = 
IcebergTableUtil.getDataAndDeleteFiles(table,
+            compactionPartSpecId, compactionPartitionPath);
+
+        RewriteFiles rewriteFiles = table.newRewrite();
+        
rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
+
+        existingFiles.first().forEach(rewriteFiles::deleteFile);
+        existingFiles.second().forEach(rewriteFiles::deleteFile);
+        results.dataFiles().forEach(rewriteFiles::addFile);
+
+        rewriteFiles.commit();
       }
-      ReplacePartitions overwrite = transaction.newReplacePartitions();
+      LOG.info("Compaction commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
+          table, results.dataFiles().size());
+    } else {
+      LOG.info("Empty compaction commit, took {} ms for table: {}", 
System.currentTimeMillis() - startTime, table);
+    }
+
+    LOG.debug("Compacted partitions with files {}", results);
+  }
+
+  /**
+   * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
+   * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
+   * then the unpartitioned table is truncated.
+   * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
+   * then the unpartitioned table remains unchanged.
+   *
+   * @param table                   The table we are changing
+   * @param startTime               The start time of the commit - used only 
for logging
+   * @param results                 The object containing the new files
+   */
+  private void commitOverwrite(Table table, String branchName, long startTime, 
FilesForCommit results) {
+    Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not 
handle deletes with overwrite");

Review Comment:
   What is the idea behind this check?



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -141,11 +142,13 @@ private List<Partition> getPartitions(Table table, 
AlterTableCompactDesc desc, D
     } else {
       Map<String, String> partitionSpec = desc.getPartitionSpec();
       partitions = context.getDb().getPartitions(table, partitionSpec);
-      if (partitions.size() > 1) {
-        throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
-      } else if (partitions.isEmpty()) {
+      if (partitions.isEmpty()) {
         throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
       }
+      partitions = partitions.stream().filter(part -> part.getSpec().size() == 
partitionSpec.size()).collect(Collectors.toList());

Review Comment:
   Shouldn't this ideally be part of the `getPartitions` call itself. If we are 
passing exact partition specs then shouldn't `getPartitions` return the exact 
partition corresponding to it and not the partial ones?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -558,33 +577,71 @@ private void commitWrite(Table table, String branchName, 
Long snapshotId, long s
   }
 
   /**
-   * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
-   * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
-   * then the unpartitioned table is truncated.
-   * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
-   * then the unpartitioned table remains unchanged.
-   * @param table The table we are changing
-   * @param startTime The start time of the commit - used only for logging
-   * @param results The object containing the new files
-   * @param rewritePolicy The rewrite policy to use for the insert overwrite 
commit
+   * Creates and commits an Iceberg compaction change with the provided data 
files.
+   * Either full table or a selected partition contents is replaced with 
compacted files.
+   *
+   * @param table                   The table we are changing
+   * @param startTime               The start time of the commit - used only 
for logging
+   * @param results                 The object containing the new files
+   * @param rewritePolicy           The rewrite policy to use for the insert 
overwrite commit
+   * @param compactionPartSpecId        The table spec_id for partition 
compaction operation
+   * @param compactionPartitionPath The path of the compacted partition
    */
-  private void commitOverwrite(Table table, String branchName, long startTime, 
FilesForCommit results,
-      RewritePolicy rewritePolicy) {
+  private void commitCompaction(Table table, long startTime, FilesForCommit 
results,
+      RewritePolicy rewritePolicy, Integer compactionPartSpecId, String 
compactionPartitionPath) {
     Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not 
handle deletes with overwrite");
     if (!results.dataFiles().isEmpty()) {
-      Transaction transaction = table.newTransaction();
       if (rewritePolicy == RewritePolicy.ALL_PARTITIONS) {
+        Transaction transaction = table.newTransaction();
         DeleteFiles delete = transaction.newDelete();
         delete.deleteFromRowFilter(Expressions.alwaysTrue());
         delete.commit();
+        ReplacePartitions overwrite = transaction.newReplacePartitions();
+        results.dataFiles().forEach(overwrite::addFile);
+        overwrite.commit();
+        transaction.commitTransaction();
+      } else {
+        Pair<List<DataFile>, List<DeleteFile>> existingFiles = 
IcebergTableUtil.getDataAndDeleteFiles(table,
+            compactionPartSpecId, compactionPartitionPath);
+
+        RewriteFiles rewriteFiles = table.newRewrite();
+        
rewriteFiles.validateFromSnapshot(table.currentSnapshot().snapshotId());
+
+        existingFiles.first().forEach(rewriteFiles::deleteFile);
+        existingFiles.second().forEach(rewriteFiles::deleteFile);
+        results.dataFiles().forEach(rewriteFiles::addFile);
+
+        rewriteFiles.commit();
       }
-      ReplacePartitions overwrite = transaction.newReplacePartitions();
+      LOG.info("Compaction commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
+          table, results.dataFiles().size());
+    } else {
+      LOG.info("Empty compaction commit, took {} ms for table: {}", 
System.currentTimeMillis() - startTime, table);

Review Comment:
   Does compaction ever reach this statement? Also the log stmt seems shady, 
there is no commit that has happened on the table when 0 files are present.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -481,7 +484,23 @@ private void commitTable(FileIO io, ExecutorService 
executor, OutputTable output
           .map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
           .orElse(RewritePolicy.DEFAULT.name()));
 
-      commitOverwrite(table, branchName, startTime, filesForCommit, 
rewritePolicy);
+      Integer compactionPartSpecId = outputTable.jobContexts.stream()
+          .findAny()
+          .map(x -> 
x.getJobConf().get(CompactorContext.COMPACTION_PART_SPEC_ID))
+          .map(Integer::valueOf)
+          .orElse(null);
+
+      String compactionPartitionPath = outputTable.jobContexts.stream()
+          .findAny()
+          .map(x -> 
x.getJobConf().get(CompactorContext.COMPACTION_PARTITION_PATH))
+          .orElse(null);
+
+      if (rewritePolicy != RewritePolicy.DEFAULT || compactionPartSpecId != 
null) {

Review Comment:
   What is the rewrite policy in this case? Since I see only 2 enums - DEFAULT 
& ALL_PARTITIONS. Is there a chance that this can be `null`.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java:
##########
@@ -81,11 +128,12 @@ public boolean run(CompactorContext context) throws 
IOException, HiveException,
           .filter(col -> !partSpecMap.containsKey(col))
           .collect(Collectors.joining(","));
 
-      compactionQuery = String.format("insert overwrite table %1$s 
partition(%2$s) select %4$s from %1$s where %3$s",
+      compactionQuery = String.format("insert overwrite table %1$s 
partition(%2$s) " +
+              "select %4$s from %1$s where %3$s and partition__spec__id = 
%5$d",

Review Comment:
   Can we try to use VirtualColumn.PARTITION_SPEC_ID.getName() instead of 
`partition__spec__id`?
   This would indicate that we are using a virtual column.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -1868,7 +1872,8 @@ public void 
validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
     }
 
     Map<String, Types.NestedField> mapOfPartColNamesWithTypes = 
Maps.newHashMap();
-    for (PartitionField partField : table.spec().fields()) {
+    List<PartitionField> allPartFields = 
IcebergTableUtil.getAllPartitionFields(table);

Review Comment:
   `getAllPartitionFields` essentially returns all columns across different 
specs of the table. Whereas `validatePartSpec` API is used in many places where 
current table spec is expected. Hence I think this is incorrect. 
   
   Doing this might allow performing - 
   `insert into table <tableName> partition (previous partition specs) ... `
   which should not be allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to