This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 39b9ef131d9 HIVE-29437: Iceberg: Fix concurrency issues between
compaction and concurrent write operations. (#6292)
39b9ef131d9 is described below
commit 39b9ef131d9beb5381d4274b9d9b5d913701e36b
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Tue Feb 10 13:13:37 2026 -0500
HIVE-29437: Iceberg: Fix concurrency issues between compaction and
concurrent write operations. (#6292)
Co-authored-by: Dmitriy Fingerman <[email protected]>
---
.../mr/hive/HiveIcebergOutputCommitter.java | 5 +-
.../mr/hive/compaction/IcebergCompactionUtil.java | 18 +--
.../mr/hive/compaction/IcebergTableOptimizer.java | 141 ++++++++++++++-------
3 files changed, 109 insertions(+), 55 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index f46133d0f9d..348fdea28ba 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -612,9 +612,10 @@ private void commit(SnapshotUpdate<?> update) {
*/
private void commitCompaction(Table table, Long snapshotId, long startTime,
FilesForCommit results,
String partitionPath, long fileSizeThreshold) {
- List<DataFile> existingDataFiles =
IcebergCompactionUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
+ List<DataFile> existingDataFiles =
+ IcebergCompactionUtil.getDataFiles(table, snapshotId, partitionPath,
fileSizeThreshold);
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
- IcebergCompactionUtil.getDeleteFiles(table, partitionPath) :
Collections.emptyList();
+ IcebergCompactionUtil.getDeleteFiles(table, snapshotId, partitionPath)
: Collections.emptyList();
RewriteFiles rewriteFiles = table.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
index 63f1c3edf21..fca301f870d 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
@@ -22,7 +22,6 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -69,15 +68,16 @@ public static boolean shouldIncludeForCompaction(Table
table, String partitionPa
* @param table the iceberg table
* @param partitionPath partition path
*/
- public static List<DataFile> getDataFiles(Table table, String partitionPath,
long fileSizeThreshold) {
- CloseableIterable<FileScanTask> fileScanTasks =
-
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
- CloseableIterable<FileScanTask> filteredFileScanTasks =
- CloseableIterable.filter(fileScanTasks, t -> {
+ public static List<DataFile> getDataFiles(Table table, Long snapshotId,
String partitionPath,
+ long fileSizeThreshold) {
+ CloseableIterable<ScanTask> scanTasks =
+ table.newBatchScan().useSnapshot(snapshotId).planFiles();
+ CloseableIterable<ScanTask> filteredScanTasks =
+ CloseableIterable.filter(scanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return shouldIncludeForCompaction(table, partitionPath, file,
fileSizeThreshold);
});
- return
Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t ->
t.file()));
+ return Lists.newArrayList(CloseableIterable.transform(filteredScanTasks, t
-> t.asFileScanTask().file()));
}
/**
@@ -88,10 +88,10 @@ public static List<DataFile> getDataFiles(Table table,
String partitionPath, lon
* @param table the iceberg table
* @param partitionPath partition path
*/
- public static List<DeleteFile> getDeleteFiles(Table table, String
partitionPath) {
+ public static List<DeleteFile> getDeleteFiles(Table table, Long snapshotId,
String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
- CloseableIterable<ScanTask> deletesScanTasks =
deletesTable.newBatchScan().planFiles();
+ CloseableIterable<ScanTask> deletesScanTasks =
deletesTable.newBatchScan().useSnapshot(snapshotId).planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index dfc352ee0da..5c612bd0479 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -19,15 +19,18 @@
package org.apache.iceberg.mr.hive.compaction;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -96,13 +99,13 @@ public Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompac
if (hasNewCommits(icebergTable,
snapshotTimeMilCache.get(qualifiedTableName))) {
if (icebergTable.spec().isPartitioned()) {
List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
findModifiedPartitions(hiveTable,
- icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
true);
+ icebergTable, snapshotTimeMilCache.get(qualifiedTableName));
partitions.forEach(partition -> addCompactionTargetIfEligible(table,
icebergTable,
partition.getName(), compactionTargets, currentCompactions,
skipDBs, skipTables));
- if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) &&
!findModifiedPartitions(hiveTable,
- icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
false).isEmpty()) {
+ if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) &&
hasModifiedPartitions(icebergTable,
+ snapshotTimeMilCache.get(qualifiedTableName))) {
addCompactionTargetIfEligible(table, icebergTable,
null, compactionTargets, currentCompactions, skipDBs,
skipTables);
}
@@ -160,58 +163,108 @@ private void addCompactionTargetIfEligible(Table table,
org.apache.iceberg.Table
compactions.add(ci);
}
- /**
- * Finds all unique non-compaction-modified partitions (with added or
deleted files) between a given past
- * snapshot ID and the table's current (latest) snapshot.
- * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table}
instance to inspect.
- * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot
to check from (exclusive).
- * @param latestSpecOnly when True, returns partitions with the current spec
only;
- * False - older specs only;
- * Null - any spec
- * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition}
representing the unique modified
- * partition names.
- * @throws IllegalArgumentException if snapshot IDs are invalid or out of
order, or if the table has no current
- * snapshot.
- */
- private List<Partition>
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
- org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean
latestSpecOnly) {
+ private <R> R findModifiedPartitionsInternal(
+ org.apache.iceberg.Table icebergTable,
+ Long pastSnapshotTimeMil,
+ Boolean latestSpecOnly,
+ Supplier<R> resultSupplier,
+ BiFunction<R, Set<String>, Boolean> resultConsumer
+ ) {
- List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable,
pastSnapshotTimeMil).toList();
+ List<Snapshot> relevantSnapshots =
+ getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList();
+
+ R result = resultSupplier.get();
if (relevantSnapshots.isEmpty()) {
- return Collections.emptyList();
+ return result;
}
+ List<Callable<Set<String>>> tasks =
+ createPartitionNameTasks(icebergTable, relevantSnapshots,
latestSpecOnly);
+
try (ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor()) {
- // Submit a task for each snapshot and collect the Futures
- List<Future<Set<String>>> futures = relevantSnapshots.stream()
- .map(snapshot -> executor.submit(() -> {
- FileIO io = icebergTable.io();
- List<ContentFile<?>> affectedFiles =
FluentIterable.<ContentFile<?>>concat(
- snapshot.addedDataFiles(io),
- snapshot.removedDataFiles(io),
- snapshot.addedDeleteFiles(io),
- snapshot.removedDeleteFiles(io))
- .toList();
- return IcebergTableUtil.getPartitionNames(icebergTable,
affectedFiles, latestSpecOnly);
- }))
- .toList();
-
- // Collect the results from all completed futures
- Set<String> modifiedPartitions = Sets.newHashSet();
- for (Future<Set<String>> future : futures) {
- modifiedPartitions.addAll(future.get());
+ CompletionService<Set<String>> cs = new
ExecutorCompletionService<>(executor);
+
+ // submit tasks
+ for (Callable<Set<String>> task : tasks) {
+ cs.submit(task);
}
- return IcebergTableUtil.convertNameToMetastorePartition(hiveTable,
modifiedPartitions);
+ // process results
+ for (int i = 0; i < tasks.size(); i++) {
+ if (resultConsumer.apply(result, cs.take().get())) {
+ return (R) Boolean.TRUE; // short-circuit
+ }
+ }
+
+ return result;
+
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new RuntimeMetaException(e, "Interrupted while finding modified
partitions");
+ throw new RuntimeMetaException(
+ e, "Interrupted while processing modified partitions");
} catch (ExecutionException e) {
- // Just wrap this one in a runtime exception
- throw new RuntimeMetaException(e, "Failed to find modified partitions in
parallel");
+ throw new RuntimeMetaException(
+ e, "Failed to process modified partitions in parallel");
}
}
+ private boolean hasModifiedPartitions(
+ org.apache.iceberg.Table icebergTable,
+ Long pastSnapshotTimeMil) {
+
+ return findModifiedPartitionsInternal(
+ icebergTable,
+ pastSnapshotTimeMil,
+ false,
+ () -> false,
+ (ignored, partitions) -> !partitions.isEmpty()
+ );
+ }
+
+ private List<Partition> findModifiedPartitions(
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+ org.apache.iceberg.Table icebergTable,
+ Long pastSnapshotTimeMil) {
+
+ Set<String> modifiedPartitions = findModifiedPartitionsInternal(
+ icebergTable,
+ pastSnapshotTimeMil,
+ true,
+ Sets::newHashSet,
+ (acc, partitions) -> {
+ acc.addAll(partitions);
+ return false; // never short-circuit
+ }
+ );
+
+ return IcebergTableUtil.convertNameToMetastorePartition(
+ hiveTable, modifiedPartitions);
+ }
+
+ private List<Callable<Set<String>>> createPartitionNameTasks(
+ org.apache.iceberg.Table icebergTable,
+ List<Snapshot> relevantSnapshots,
+ Boolean latestSpecOnly) {
+
+ return relevantSnapshots.stream()
+ .map(snapshot -> (Callable<Set<String>>) () ->
+ IcebergTableUtil.getPartitionNames(
+ icebergTable,
+ getAffectedFiles(snapshot, icebergTable.io()),
+ latestSpecOnly))
+ .toList();
+ }
+
+ private List<ContentFile<?>> getAffectedFiles(Snapshot snapshot, FileIO io) {
+ return FluentIterable.<ContentFile<?>>concat(
+ snapshot.addedDataFiles(io),
+ snapshot.removedDataFiles(io),
+ snapshot.addedDeleteFiles(io),
+ snapshot.removedDeleteFiles(io))
+ .toList();
+ }
+
/**
* Checks if a table has had new commits since a given snapshot that were
not caused by compaction.
* @param icebergTable The Iceberg table to check.