This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b9ef131d9 HIVE-29437: Iceberg: Fix concurrency issues between 
compaction and concurrent write operations. (#6292)
39b9ef131d9 is described below

commit 39b9ef131d9beb5381d4274b9d9b5d913701e36b
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Tue Feb 10 13:13:37 2026 -0500

    HIVE-29437: Iceberg: Fix concurrency issues between compaction and 
concurrent write operations. (#6292)
    
    Co-authored-by: Dmitriy Fingerman <[email protected]>
---
 .../mr/hive/HiveIcebergOutputCommitter.java        |   5 +-
 .../mr/hive/compaction/IcebergCompactionUtil.java  |  18 +--
 .../mr/hive/compaction/IcebergTableOptimizer.java  | 141 ++++++++++++++-------
 3 files changed, 109 insertions(+), 55 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index f46133d0f9d..348fdea28ba 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -612,9 +612,10 @@ private void commit(SnapshotUpdate<?> update) {
    */
   private void commitCompaction(Table table, Long snapshotId, long startTime, 
FilesForCommit results,
       String partitionPath, long fileSizeThreshold) {
-    List<DataFile> existingDataFiles = 
IcebergCompactionUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
+    List<DataFile> existingDataFiles =
+        IcebergCompactionUtil.getDataFiles(table, snapshotId, partitionPath, 
fileSizeThreshold);
     List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
-        IcebergCompactionUtil.getDeleteFiles(table, partitionPath) : 
Collections.emptyList();
+        IcebergCompactionUtil.getDeleteFiles(table, snapshotId, partitionPath) 
: Collections.emptyList();
 
     RewriteFiles rewriteFiles = table.newRewrite();
     existingDataFiles.forEach(rewriteFiles::deleteFile);
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
index 63f1c3edf21..fca301f870d 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
@@ -22,7 +22,6 @@
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PositionDeletesScanTask;
@@ -69,15 +68,16 @@ public static boolean shouldIncludeForCompaction(Table 
table, String partitionPa
    * @param table the iceberg table
    * @param partitionPath partition path
    */
-  public static List<DataFile> getDataFiles(Table table, String partitionPath, 
long fileSizeThreshold) {
-    CloseableIterable<FileScanTask> fileScanTasks =
-        
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
-    CloseableIterable<FileScanTask> filteredFileScanTasks =
-        CloseableIterable.filter(fileScanTasks, t -> {
+  public static List<DataFile> getDataFiles(Table table, Long snapshotId, 
String partitionPath,
+      long fileSizeThreshold) {
+    CloseableIterable<ScanTask> scanTasks =
+        table.newBatchScan().useSnapshot(snapshotId).planFiles();
+    CloseableIterable<ScanTask> filteredScanTasks =
+        CloseableIterable.filter(scanTasks, t -> {
           DataFile file = t.asFileScanTask().file();
           return shouldIncludeForCompaction(table, partitionPath, file, 
fileSizeThreshold);
         });
-    return 
Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> 
t.file()));
+    return Lists.newArrayList(CloseableIterable.transform(filteredScanTasks, t 
-> t.asFileScanTask().file()));
   }
 
   /**
@@ -88,10 +88,10 @@ public static List<DataFile> getDataFiles(Table table, 
String partitionPath, lon
    * @param table the iceberg table
    * @param partitionPath partition path
    */
-  public static List<DeleteFile> getDeleteFiles(Table table, String 
partitionPath) {
+  public static List<DeleteFile> getDeleteFiles(Table table, Long snapshotId, 
String partitionPath) {
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
-    CloseableIterable<ScanTask> deletesScanTasks = 
deletesTable.newBatchScan().planFiles();
+    CloseableIterable<ScanTask> deletesScanTasks = 
deletesTable.newBatchScan().useSnapshot(snapshotId).planFiles();
     CloseableIterable<ScanTask> filteredDeletesScanTasks =
         CloseableIterable.filter(deletesScanTasks, t -> {
           DeleteFile file = ((PositionDeletesScanTask) t).file();
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index dfc352ee0da..5c612bd0479 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -19,15 +19,18 @@
 package org.apache.iceberg.mr.hive.compaction;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -96,13 +99,13 @@ public Set<CompactionInfo> findPotentialCompactions(long 
lastChecked, ShowCompac
       if (hasNewCommits(icebergTable, 
snapshotTimeMilCache.get(qualifiedTableName))) {
         if (icebergTable.spec().isPartitioned()) {
           List<org.apache.hadoop.hive.ql.metadata.Partition> partitions = 
findModifiedPartitions(hiveTable,
-              icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
true);
+              icebergTable, snapshotTimeMilCache.get(qualifiedTableName));
 
           partitions.forEach(partition -> addCompactionTargetIfEligible(table, 
icebergTable,
               partition.getName(), compactionTargets, currentCompactions, 
skipDBs, skipTables));
 
-          if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) && 
!findModifiedPartitions(hiveTable,
-              icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
false).isEmpty()) {
+          if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) && 
hasModifiedPartitions(icebergTable,
+              snapshotTimeMilCache.get(qualifiedTableName))) {
             addCompactionTargetIfEligible(table, icebergTable,
                 null, compactionTargets, currentCompactions, skipDBs, 
skipTables);
           }
@@ -160,58 +163,108 @@ private void addCompactionTargetIfEligible(Table table, 
org.apache.iceberg.Table
     compactions.add(ci);
   }
 
-  /**
-   * Finds all unique non-compaction-modified partitions (with added or 
deleted files) between a given past
-   * snapshot ID and the table's current (latest) snapshot.
-   * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} 
instance to inspect.
-   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
-   * @param latestSpecOnly when True, returns partitions with the current spec 
only;
-   *                       False - older specs only;
-   *                       Null - any spec
-   * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} 
representing the unique modified
-   *                       partition names.
-   * @throws IllegalArgumentException if snapshot IDs are invalid or out of 
order, or if the table has no current
-   *                       snapshot.
-   */
-  private List<Partition> 
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
-      org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean 
latestSpecOnly) {
+  private <R> R findModifiedPartitionsInternal(
+      org.apache.iceberg.Table icebergTable,
+      Long pastSnapshotTimeMil,
+      Boolean latestSpecOnly,
+      Supplier<R> resultSupplier,
+      BiFunction<R, Set<String>, Boolean> resultConsumer
+  ) {
 
-    List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, 
pastSnapshotTimeMil).toList();
+    List<Snapshot> relevantSnapshots =
+        getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList();
+
+    R result = resultSupplier.get();
     if (relevantSnapshots.isEmpty()) {
-      return Collections.emptyList();
+      return result;
     }
 
+    List<Callable<Set<String>>> tasks =
+        createPartitionNameTasks(icebergTable, relevantSnapshots, 
latestSpecOnly);
+
     try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
-      // Submit a task for each snapshot and collect the Futures
-      List<Future<Set<String>>> futures = relevantSnapshots.stream()
-          .map(snapshot -> executor.submit(() -> {
-            FileIO io = icebergTable.io();
-            List<ContentFile<?>> affectedFiles = 
FluentIterable.<ContentFile<?>>concat(
-                    snapshot.addedDataFiles(io),
-                    snapshot.removedDataFiles(io),
-                    snapshot.addedDeleteFiles(io),
-                    snapshot.removedDeleteFiles(io))
-                .toList();
-            return IcebergTableUtil.getPartitionNames(icebergTable, 
affectedFiles, latestSpecOnly);
-          }))
-          .toList();
-
-      // Collect the results from all completed futures
-      Set<String> modifiedPartitions = Sets.newHashSet();
-      for (Future<Set<String>> future : futures) {
-        modifiedPartitions.addAll(future.get());
+      CompletionService<Set<String>> cs = new 
ExecutorCompletionService<>(executor);
+
+      // submit tasks
+      for (Callable<Set<String>> task : tasks) {
+        cs.submit(task);
       }
 
-      return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, 
modifiedPartitions);
+      // process results
+      for (int i = 0; i < tasks.size(); i++) {
+        if (resultConsumer.apply(result, cs.take().get())) {
+          return (R) Boolean.TRUE; // short-circuit
+        }
+      }
+
+      return result;
+
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new RuntimeMetaException(e, "Interrupted while finding modified 
partitions");
+      throw new RuntimeMetaException(
+          e, "Interrupted while processing modified partitions");
     } catch (ExecutionException e) {
-      // Just wrap this one in a runtime exception
-      throw new RuntimeMetaException(e, "Failed to find modified partitions in 
parallel");
+      throw new RuntimeMetaException(
+          e, "Failed to process modified partitions in parallel");
     }
   }
 
+  private boolean hasModifiedPartitions(
+      org.apache.iceberg.Table icebergTable,
+      Long pastSnapshotTimeMil) {
+
+    return findModifiedPartitionsInternal(
+        icebergTable,
+        pastSnapshotTimeMil,
+        false,
+        () -> false,
+        (ignored, partitions) -> !partitions.isEmpty()
+    );
+  }
+
+  private List<Partition> findModifiedPartitions(
+      org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+      org.apache.iceberg.Table icebergTable,
+      Long pastSnapshotTimeMil) {
+
+    Set<String> modifiedPartitions = findModifiedPartitionsInternal(
+        icebergTable,
+        pastSnapshotTimeMil,
+        true,
+        Sets::newHashSet,
+        (acc, partitions) -> {
+          acc.addAll(partitions);
+          return false; // never short-circuit
+        }
+    );
+
+    return IcebergTableUtil.convertNameToMetastorePartition(
+        hiveTable, modifiedPartitions);
+  }
+
+  private List<Callable<Set<String>>> createPartitionNameTasks(
+      org.apache.iceberg.Table icebergTable,
+      List<Snapshot> relevantSnapshots,
+      Boolean latestSpecOnly) {
+
+    return relevantSnapshots.stream()
+        .map(snapshot -> (Callable<Set<String>>) () ->
+            IcebergTableUtil.getPartitionNames(
+                icebergTable,
+                getAffectedFiles(snapshot, icebergTable.io()),
+                latestSpecOnly))
+        .toList();
+  }
+
+  private List<ContentFile<?>> getAffectedFiles(Snapshot snapshot, FileIO io) {
+    return FluentIterable.<ContentFile<?>>concat(
+            snapshot.addedDataFiles(io),
+            snapshot.removedDataFiles(io),
+            snapshot.addedDeleteFiles(io),
+            snapshot.removedDeleteFiles(io))
+        .toList();
+  }
+
   /**
    * Checks if a table has had new commits since a given snapshot that were 
not caused by compaction.
    * @param icebergTable The Iceberg table to check.

Reply via email to