This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new de41011180 Core: Detect and merge duplicate DVs for a data file and
merge them before committing (#15006)
de41011180 is described below
commit de41011180b1e5bd87a12a5177f840c8dface38e
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Sat Feb 28 18:52:32 2026 -0700
Core: Detect and merge duplicate DVs for a data file and merge them before
committing (#15006)
---
core/src/main/java/org/apache/iceberg/DVUtil.java | 206 +++++++++++
.../apache/iceberg/MergingSnapshotProducer.java | 84 ++++-
.../java/org/apache/iceberg/SnapshotProducer.java | 3 -
.../apache/iceberg/deletes/BaseDVFileWriter.java | 26 +-
.../org/apache/iceberg/deletes/DVFileWriter.java | 17 +
.../main/java/org/apache/iceberg/io/IOUtil.java | 25 ++
.../test/java/org/apache/iceberg/TestRowDelta.java | 406 +++++++++++++++++++++
.../org/apache/iceberg/data/BaseDeleteLoader.java | 31 +-
.../spark/source/TestPositionDeletesTable.java | 8 +-
.../actions/TestRemoveDanglingDeleteAction.java | 12 +-
.../spark/source/TestPositionDeletesTable.java | 8 +-
.../actions/TestRemoveDanglingDeleteAction.java | 10 +-
.../spark/source/TestPositionDeletesTable.java | 8 +-
.../actions/TestRemoveDanglingDeleteAction.java | 12 +-
.../spark/source/TestPositionDeletesTable.java | 8 +-
15 files changed, 777 insertions(+), 87 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java
b/core/src/main/java/org/apache/iceberg/DVUtil.java
new file mode 100644
index 0000000000..c323e96775
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DVUtil.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+
+class DVUtil {
+ private DVUtil() {}
+
+ static PositionDeleteIndex readDV(DeleteFile deleteFile, FileIO fileIO) {
+ Preconditions.checkArgument(
+ ContentFileUtil.isDV(deleteFile),
+ "Cannot read, not a deletion vector: %s",
+ deleteFile.location());
+ InputFile inputFile = fileIO.newInputFile(deleteFile);
+ long offset = deleteFile.contentOffset();
+ int length = deleteFile.contentSizeInBytes().intValue();
+ byte[] bytes = new byte[length];
+ try {
+ IOUtil.readFully(inputFile, offset, bytes, 0, length);
+ return PositionDeleteIndex.deserialize(bytes, deleteFile);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Merges duplicate DVs for the same data file and writes the merged DV
Puffin files. If there is
+ * exactly 1 DV for a given data file then it is return as is
+ *
+ * @param dvsByReferencedFile map of data file location to DVs
+ * @param mergedOutputLocation output location of the merged DVs
+ * @param fileIO fileIO to use when reading and writing
+ * @param specs partition specs
+ * @param pool executor service for reading DVs
+ * @return a list containing both any newly merged DVs and any DVs that are
already valid
+ */
+ static List<DeleteFile> mergeAndWriteDVsIfRequired(
+ Map<String, List<DeleteFile>> dvsByReferencedFile,
+ String mergedOutputLocation,
+ FileIO fileIO,
+ Map<Integer, PartitionSpec> specs,
+ ExecutorService pool) {
+ List<DeleteFile> finalDVs = Lists.newArrayList();
+ Multimap<String, DeleteFile> duplicates =
+ Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+ Map<String, Pair<PartitionSpec, StructLike>> partitions =
Maps.newHashMap();
+
+ for (Map.Entry<String, List<DeleteFile>> entry :
dvsByReferencedFile.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ duplicates.putAll(entry.getKey(), entry.getValue());
+ DeleteFile first = entry.getValue().get(0);
+ partitions.put(entry.getKey(), Pair.of(specs.get(first.specId()),
first.partition()));
+ } else {
+ finalDVs.addAll(entry.getValue());
+ }
+ }
+
+ if (duplicates.isEmpty()) {
+ return finalDVs;
+ }
+
+ validateCanMerge(duplicates, partitions);
+
+ Map<String, PositionDeleteIndex> deletes =
+ readAndMergeDVs(duplicates.values().toArray(DeleteFile[]::new),
fileIO, pool);
+
+ finalDVs.addAll(writeDVs(deletes, fileIO, mergedOutputLocation,
partitions));
+ return finalDVs;
+ }
+
+ private static void validateCanMerge(
+ Multimap<String, DeleteFile> duplicates,
+ Map<String, Pair<PartitionSpec, StructLike>> partitions) {
+ Map<Integer, Comparator<StructLike>> comparatorsBySpecId =
Maps.newHashMap();
+ for (Map.Entry<String, Collection<DeleteFile>> entry :
duplicates.asMap().entrySet()) {
+ String referencedFile = entry.getKey();
+
+ // validate that each file matches the expected partition
+ Pair<PartitionSpec, StructLike> partition =
partitions.get(referencedFile);
+ Long sequenceNumber = Iterables.getFirst(entry.getValue(),
null).dataSequenceNumber();
+ PartitionSpec spec = partition.first();
+ StructLike tuple = partition.second();
+ Comparator<StructLike> comparator =
+ comparatorsBySpecId.computeIfAbsent(
+ spec.specId(), id -> Comparators.forType(spec.partitionType()));
+
+ for (DeleteFile dv : entry.getValue()) {
+ Preconditions.checkArgument(
+ Objects.equals(sequenceNumber, dv.dataSequenceNumber()),
+ "Cannot merge DVs, mismatched sequence numbers (%s, %s) for %s",
+ sequenceNumber,
+ dv.dataSequenceNumber(),
+ referencedFile);
+
+ Preconditions.checkArgument(
+ spec.specId() == dv.specId(),
+ "Cannot merge DVs, mismatched partition specs (%s, %s) for %s",
+ spec.specId(),
+ dv.specId(),
+ referencedFile);
+
+ Preconditions.checkArgument(
+ comparator.compare(tuple, dv.partition()) == 0,
+ "Cannot merge DVs, mismatched partition tuples (%s, %s) for %s",
+ tuple,
+ dv.partition(),
+ referencedFile);
+ }
+ }
+ }
+
+ /**
+ * Reads all DVs, and merge the position indices per referenced data file
+ *
+ * @param duplicateDVs list of dvs to read and merge
+ * @param io the FileIO to use for reading DV files
+ * @param pool executor service for reading DVs
+ * @return map of referenced data file location to the merged position
delete index
+ */
+ private static Map<String, PositionDeleteIndex> readAndMergeDVs(
+ DeleteFile[] duplicateDVs, FileIO io, ExecutorService pool) {
+ // Read all duplicate DVs in parallel
+ PositionDeleteIndex[] duplicatedDVPositions = new
PositionDeleteIndex[duplicateDVs.length];
+ Tasks.range(duplicatedDVPositions.length)
+ .executeWith(pool)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(i -> duplicatedDVPositions[i] = readDV(duplicateDVs[i], io));
+
+ Map<String, PositionDeleteIndex> mergedDVs = Maps.newHashMap();
+ for (int i = 0; i < duplicatedDVPositions.length; i++) {
+ DeleteFile dv = duplicateDVs[i];
+ PositionDeleteIndex previousDV =
mergedDVs.get(duplicateDVs[i].referencedDataFile());
+ if (previousDV != null) {
+ previousDV.merge(duplicatedDVPositions[i]);
+ } else {
+ mergedDVs.put(dv.referencedDataFile(), duplicatedDVPositions[i]);
+ }
+ }
+
+ return mergedDVs;
+ }
+
+ // Produces a single Puffin file containing the merged DVs
+ private static List<DeleteFile> writeDVs(
+ Map<String, PositionDeleteIndex> mergedIndexByFile,
+ FileIO fileIO,
+ String dvOutputLocation,
+ Map<String, Pair<PartitionSpec, StructLike>> partitions) {
+ OutputFile dvOutputFile = fileIO.newOutputFile(dvOutputLocation);
+ try (DVFileWriter dvFileWriter = new BaseDVFileWriter(() -> dvOutputFile,
path -> null)) {
+ for (Map.Entry<String, PositionDeleteIndex> entry :
mergedIndexByFile.entrySet()) {
+ String referencedLocation = entry.getKey();
+ PositionDeleteIndex mergedPositions = entry.getValue();
+ Pair<PartitionSpec, StructLike> partition =
partitions.get(referencedLocation);
+ dvFileWriter.delete(
+ referencedLocation, mergedPositions, partition.first(),
partition.second());
+ }
+ dvFileWriter.close();
+ return dvFileWriter.result().deleteFiles();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 79dcec3411..3d49da3653 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,13 +30,17 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
@@ -47,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
@@ -55,6 +60,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,15 +88,17 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private final ManifestFilterManager<DataFile> filterManager;
private final ManifestMergeManager<DeleteFile> deleteMergeManager;
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
+ private final AtomicInteger dvMergeAttempt = new AtomicInteger(0);
// update data
private final Map<Integer, DataFileSet> newDataFilesBySpec =
Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
- private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec =
Maps.newHashMap();
- private final Set<String> newDVRefs = Sets.newHashSet();
+ private final List<DeleteFile> v2Deletes = Lists.newArrayList();
+ private final Map<String, List<DeleteFile>> dvsByReferencedFile =
Maps.newLinkedHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests =
Lists.newArrayList();
- private final SnapshotSummary.Builder addedFilesSummary =
SnapshotSummary.builder();
+ private final SnapshotSummary.Builder addedDataFilesSummary =
SnapshotSummary.builder();
+ private final SnapshotSummary.Builder addedDeleteFilesSummary =
SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary =
SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
@@ -222,7 +230,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
protected boolean addsDeleteFiles() {
- return !newDeleteFilesBySpec.isEmpty();
+ return !v2Deletes.isEmpty()
+ || dvsByReferencedFile.values().stream().anyMatch(dvs ->
!dvs.isEmpty());
}
/** Add a data file to the new snapshot. */
@@ -238,7 +247,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
DataFileSet dataFiles =
newDataFilesBySpec.computeIfAbsent(spec.specId(), ignored ->
DataFileSet.create());
if (dataFiles.add(Delegates.suppressFirstRowId(file))) {
- addedFilesSummary.addedFile(spec, file);
+ addedDataFilesSummary.addedFile(spec, file);
hasNewDataFiles = true;
}
}
@@ -265,15 +274,14 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
"Cannot find partition spec %s for delete file: %s",
file.specId(),
file.location());
-
- DeleteFileSet deleteFiles =
- newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored ->
DeleteFileSet.create());
- if (deleteFiles.add(file)) {
- addedFilesSummary.addedFile(spec, file);
- hasNewDeleteFiles = true;
- if (ContentFileUtil.isDV(file)) {
- newDVRefs.add(file.referencedDataFile());
- }
+ hasNewDeleteFiles = true;
+ if (ContentFileUtil.isDV(file)) {
+ List<DeleteFile> dvsForReferencedFile =
+ dvsByReferencedFile.computeIfAbsent(
+ file.referencedDataFile(), newFile -> Lists.newArrayList());
+ dvsForReferencedFile.add(file);
+ } else {
+ v2Deletes.add(file);
}
}
@@ -814,7 +822,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
Expression conflictDetectionFilter,
Snapshot parent) {
// skip if there is no current table state or this operation doesn't add
new DVs
- if (parent == null || newDVRefs.isEmpty()) {
+ if (parent == null || dvsByReferencedFile.isEmpty()) {
return;
}
@@ -847,7 +855,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
DeleteFile file = entry.file();
if (newSnapshotIds.contains(entry.snapshotId()) &&
ContentFileUtil.isDV(file)) {
ValidationException.check(
- !newDVRefs.contains(file.referencedDataFile()),
+ !dvsByReferencedFile.containsKey(file.referencedDataFile()),
"Found concurrently added DV for %s: %s",
file.referencedDataFile(),
ContentFileUtil.dvDesc(file));
@@ -953,7 +961,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
// update the snapshot summary
summaryBuilder.clear();
- summaryBuilder.merge(addedFilesSummary);
+ summaryBuilder.merge(addedDataFilesSummary);
+ summaryBuilder.merge(addedDeleteFilesSummary);
summaryBuilder.merge(appendedManifestsSummary);
summaryBuilder.merge(filterManager.buildSummary(filtered));
summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
@@ -1061,7 +1070,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
private Iterable<ManifestFile> prepareDeleteManifests() {
- if (newDeleteFilesBySpec.isEmpty()) {
+ if (!addsDeleteFiles()) {
return ImmutableList.of();
}
@@ -1076,12 +1085,22 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
// this triggers a rewrite of all delete manifests even if there is only
one new delete file
// if there is a relevant use case in the future, the behavior can be
optimized
cachedNewDeleteManifests.clear();
+ // On cache invalidation of delete files, clear the summary because any
new DV could require a
+ // merge,
+ // and the summary cannot be generated until after merging is complete.
+ addedDeleteFilesSummary.clear();
}
if (cachedNewDeleteManifests.isEmpty()) {
+ List<DeleteFile> mergedDVs = mergeDVs();
+ Map<Integer, List<DeleteFile>> newDeleteFilesBySpec =
+ Streams.stream(Iterables.concat(mergedDVs,
DeleteFileSet.of(v2Deletes)))
+ .collect(Collectors.groupingBy(ContentFile::specId));
+
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
+ deleteFiles.forEach(file ->
addedDeleteFilesSummary.addedFile(spec, file));
List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
@@ -1092,6 +1111,35 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
return cachedNewDeleteManifests;
}
+ private List<DeleteFile> mergeDVs() {
+ for (Map.Entry<String, List<DeleteFile>> entry :
dvsByReferencedFile.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ LOG.warn(
+ "Merging {} duplicate DVs for data file {} in table {}.",
+ entry.getValue().size(),
+ entry.getKey(),
+ tableName);
+ }
+ }
+
+ FileIO fileIO = EncryptingFileIO.combine(ops().io(), ops().encryption());
+
+ String dvOutputLocation =
+ ops()
+ .locationProvider()
+ .newDataLocation(
+ FileFormat.PUFFIN.addExtension(
+ String.format(
+ "merged-dvs-%s-%s", snapshotId(),
dvMergeAttempt.incrementAndGet())));
+
+ return DVUtil.mergeAndWriteDVsIfRequired(
+ dvsByReferencedFile,
+ dvOutputLocation,
+ fileIO,
+ ops().current().specsById(),
+ ThreadPools.getDeleteWorkerPool());
+ }
+
private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
private DataFileFilterManager() {
super(ops().current().specsById(),
MergingSnapshotProducer.this::workerPool);
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index cbae25132d..ffbebf5998 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -726,9 +726,6 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
for (DeleteFile file : files) {
- Preconditions.checkArgument(
- file instanceof Delegates.PendingDeleteFile,
- "Invalid delete file: must be PendingDeleteFile");
if (file.dataSequenceNumber() != null) {
closableWriter.add(file, file.dataSequenceNumber());
} else {
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
index 6eabd64514..348b4d48d0 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
@@ -29,8 +30,8 @@ import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
@@ -51,7 +52,7 @@ public class BaseDVFileWriter implements DVFileWriter {
private static final String REFERENCED_DATA_FILE_KEY =
"referenced-data-file";
private static final String CARDINALITY_KEY = "cardinality";
- private final OutputFileFactory fileFactory;
+ private final Supplier<OutputFile> dvOutputFile;
private final Function<String, PositionDeleteIndex> loadPreviousDeletes;
private final Map<String, Deletes> deletesByPath = Maps.newHashMap();
private final Map<String, BlobMetadata> blobsByPath = Maps.newHashMap();
@@ -59,7 +60,13 @@ public class BaseDVFileWriter implements DVFileWriter {
public BaseDVFileWriter(
OutputFileFactory fileFactory, Function<String, PositionDeleteIndex>
loadPreviousDeletes) {
- this.fileFactory = fileFactory;
+ this(() -> fileFactory.newOutputFile().encryptingOutputFile(),
loadPreviousDeletes);
+ }
+
+ public BaseDVFileWriter(
+ Supplier<OutputFile> dvOutputFile,
+ Function<String, PositionDeleteIndex> loadPreviousDeletes) {
+ this.dvOutputFile = dvOutputFile;
this.loadPreviousDeletes = loadPreviousDeletes;
}
@@ -71,6 +78,17 @@ public class BaseDVFileWriter implements DVFileWriter {
positions.delete(pos);
}
+ @Override
+ public void delete(
+ String path,
+ PositionDeleteIndex positionDeleteIndex,
+ PartitionSpec spec,
+ StructLike partition) {
+ Deletes deletes =
+ deletesByPath.computeIfAbsent(path, key -> new Deletes(path, spec,
partition));
+ deletes.positions().merge(positionDeleteIndex);
+ }
+
@Override
public DeleteWriteResult result() {
Preconditions.checkState(result != null, "Cannot get result from unclosed
writer");
@@ -148,7 +166,7 @@ public class BaseDVFileWriter implements DVFileWriter {
}
private PuffinWriter newWriter() {
- EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+ OutputFile outputFile = dvOutputFile.get();
return
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build();
}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
index 2561f7be3d..88d7e02f8c 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
@@ -43,4 +43,21 @@ public interface DVFileWriter extends Closeable {
* @return the writer result
*/
DeleteWriteResult result();
+
+ /**
+ * Marks every position that is deleted in positionDeleteIndex as deleted in
the given data file.
+ * Implementations should merge with existing position indices for the
provided path
+ *
+ * @param path the data file path
+ * @param positionDeleteIndex the position delete index containing all the
positions to delete
+ * @param spec the data file partition spec
+ * @param partition the data file partition
+ */
+ default void delete(
+ String path,
+ PositionDeleteIndex positionDeleteIndex,
+ PartitionSpec spec,
+ StructLike partition) {
+ positionDeleteIndex.forEach(position -> this.delete(path, position, spec,
partition));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java
b/core/src/main/java/org/apache/iceberg/io/IOUtil.java
index 37962d322d..13abcc160e 100644
--- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java
+++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java
@@ -49,6 +49,31 @@ public class IOUtil {
}
}
+ /**
+ * Reads exactly {@code length} bytes from the input file starting at {@code
fileOffset} into the
+ * buffer. Uses range reads when supported.
+ *
+ * @param inputFile the file to read from
+ * @param fileOffset the position in the file to start reading from
+ * @param bytes a buffer to write into
+ * @param offset starting offset in the buffer for the data
+ * @param length number of bytes to read
+ * @throws IOException if there is an error while reading or if the end of
the stream is reached
+ * before reading length bytes
+ */
+ public static void readFully(
+ InputFile inputFile, long fileOffset, byte[] bytes, int offset, int
length)
+ throws IOException {
+ try (SeekableInputStream stream = inputFile.newStream()) {
+ if (stream instanceof RangeReadable) {
+ ((RangeReadable) stream).readFully(fileOffset, bytes, offset, length);
+ } else {
+ stream.seek(fileOffset);
+ readFully(stream, bytes, offset, length);
+ }
+ }
+ }
+
/** Writes a buffer into a stream, making multiple write calls if necessary.
*/
public static void writeFully(OutputStream outputStream, ByteBuffer buffer)
throws IOException {
if (!buffer.hasRemaining()) {
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 59a73ba202..749dbdc6fe 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -35,19 +35,27 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -1919,6 +1927,266 @@ public class TestRowDelta extends TestBase {
.hasMessageContaining("Found concurrently added DV for %s",
dataFile.location());
}
+ @TestTemplate
+ public void testDuplicateDVsAreMerged() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+ DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8);
+ RowDelta rowDelta1 =
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3);
+
+ commit(table, rowDelta1, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1);
+ DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);
+
+ assertDVHasDeletedPositions(mergedDV, LongStream.range(0,
8).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsMergedMultipleSpecs() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ // append a partitioned data file
+ DataFile firstSnapshotDataFile = newDataFile("data_bucket=0");
+ commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch);
+
+ // remove the only partition field to make the spec unpartitioned
+ table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+
+ // append an unpartitioned data file
+ DataFile secondSnapshotDataFile = newDataFile("");
+ commit(table, table.newAppend().appendFile(secondSnapshotDataFile),
branch);
+
+ // evolve the spec and add a new partition field
+ table.updateSpec().addField("data").commit();
+
+ // append a data file with the new spec
+ DataFile thirdSnapshotDataFile = newDataFile("data=abc");
+ commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch);
+
+ assertThat(table.specs()).hasSize(3);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DataFile dataFile = newDataFile("data=xyz");
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 2, 4);
+
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(dataFile)
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b)
+ .addDeletes(deleteFile3a)
+ .addDeletes(deleteFile3b),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ // Expect 3 merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
snapshot.addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+ assertThat(mergedDVs).hasSize(3);
+ // Should be a Puffin produced per merged DV spec
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
firstSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv ->
+ Objects.equals(dv.referencedDataFile(),
secondSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile3 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
thirdSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws
IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4);
+
+ // Commit all four duplicate DVs
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(mergedDVs).hasSize(2);
+ // Should be a single Puffin produced
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAndValidDV() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // dataFile1 has duplicate DVs that need merging
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+
+ // dataFile2 has a valid DV
+ DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3);
+
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two DVs: one merged for dataFile1 and deleteFile2
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(committedDVs).hasSize(2);
+
+ // Verify merged DV for dataFile1 has positions [0,4)
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ // Verify deleteFile2 state
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
3).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions
total
+ DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+
+ // One equality delete file for the same partition
+ DeleteFile eqDelete =
+ newEqualityDeleteFile(
+ table.spec().specId(),
+ "data_bucket=0",
+ table.schema().asStruct().fields().get(0).fieldId());
+
+ RowDelta rowDelta =
table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2);
+
+ commit(table, rowDelta, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDeletes = Lists.newArrayList(addedDeleteFiles);
+
+ // 1 DV + 1 equality delete
+ assertThat(committedDeletes).hasSize(2);
+
+ DeleteFile committedDV =
+ Iterables.getOnlyElement(
+
committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDV, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedEqDelete =
+ Iterables.getOnlyElement(
+ committedDeletes.stream()
+ .filter(df -> df.content() == FileContent.EQUALITY_DELETES)
+ .collect(Collectors.toList()));
+ assertThat(committedEqDelete).isNotNull();
+
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
+ }
+
@TestTemplate
public void testManifestMergingAfterUpgradeToV3() {
assumeThat(formatVersion).isEqualTo(2);
@@ -2001,4 +2269,142 @@ public class TestRowDelta extends TestBase {
throw new RuntimeException(e);
}
}
+
+ @TestTemplate
+ public void testCannotMergeDVsMismatchedSequenceNumbers() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ DeleteFile dv1 =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .ofPositionDeletes()
+ .withFormat(FileFormat.PUFFIN)
+ .withPath("/tmp/dv-1.puffin")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withPartition(dataFile.partition())
+ .withReferencedDataFile(dataFile.location())
+ .withContentOffset(0)
+ .withContentSizeInBytes(10)
+ .build();
+ DeleteFile dv2 =
+
FileMetadata.deleteFileBuilder(table.spec()).copy(dv1).withPath("/tmp/dv-2.puffin").build();
+
+ // Use protected add(DeleteFile, long) to assign different data sequence
numbers
+ MergingSnapshotProducer<?> rowDelta = (MergingSnapshotProducer<?>)
table.newRowDelta();
+ rowDelta.add(dv1, 1L);
+ rowDelta.add(dv2, 2L);
+
+ assertThatThrownBy(() -> commit(table, rowDelta, branch))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot merge DVs, mismatched sequence numbers");
+ }
+
+ @TestTemplate
+ public void testCannotMergeDVsMismatchedPartitionSpecs() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ // Evolve the spec so we have two distinct spec IDs
+ table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+ PartitionSpec originalSpec = table.specs().get(0);
+ PartitionSpec evolvedSpec = table.specs().get(1);
+
+ DeleteFile dv1 =
+ FileMetadata.deleteFileBuilder(originalSpec)
+ .ofPositionDeletes()
+ .withFormat(FileFormat.PUFFIN)
+ .withPath("/tmp/dv-1.puffin")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withPartition(dataFile.partition())
+ .withReferencedDataFile(dataFile.location())
+ .withContentOffset(0)
+ .withContentSizeInBytes(10)
+ .build();
+ DeleteFile dv2 =
+
FileMetadata.deleteFileBuilder(evolvedSpec).copy(dv1).withPath("/tmp/dv-2.puffin").build();
+
+ assertThatThrownBy(
+ () -> commit(table,
table.newRowDelta().addDeletes(dv1).addDeletes(dv2), branch))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot merge DVs, mismatched partition specs");
+ }
+
+ @TestTemplate
+ public void testCannotMergeDVsMismatchedPartitionTuples() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ DeleteFile dv1 =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .ofPositionDeletes()
+ .withFormat(FileFormat.PUFFIN)
+ .withPath("/tmp/dv-1.puffin")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withPartitionPath("data_bucket=0")
+ .withReferencedDataFile(dataFile.location())
+ .withContentOffset(0)
+ .withContentSizeInBytes(10)
+ .build();
+ DeleteFile dv2 =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .copy(dv1)
+ .withPath("/tmp/dv-2.puffin")
+ .withPartitionPath("data_bucket=1")
+ .build();
+
+ assertThatThrownBy(
+ () -> commit(table,
table.newRowDelta().addDeletes(dv1).addDeletes(dv2), branch))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot merge DVs, mismatched partition tuples");
+ }
+
+ private DeleteFile dvWithPositions(
+ DataFile dataFile, OutputFileFactory fileFactory, int fromInclusive, int
toExclusive)
+ throws IOException {
+
+ List<PositionDelete<?>> deletes = Lists.newArrayList();
+ for (int i = fromInclusive; i < toExclusive; i++) {
+ deletes.add(PositionDelete.create().set(dataFile.location(), i));
+ }
+
+ return writeDV(deletes, dataFile.specId(), dataFile.partition(),
fileFactory);
+ }
+
+ private void assertDVHasDeletedPositions(DeleteFile dv, Iterable<Long>
positions) {
+ assertThat(dv).isNotNull();
+ PositionDeleteIndex index = DVUtil.readDV(dv, table.io());
+ assertThat(positions)
+ .allSatisfy(
+ pos ->
+ assertThat(index.isDeleted(pos))
+ .as("Expected position %s to be deleted", pos)
+ .isTrue());
+ }
+
+ private DeleteFile writeDV(
+ List<PositionDelete<?>> deletes,
+ int specId,
+ StructLike partition,
+ OutputFileFactory fileFactory)
+ throws IOException {
+
+ DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
+ try (DVFileWriter closeableWriter = writer) {
+ for (PositionDelete<?> delete : deletes) {
+ closeableWriter.delete(
+ delete.path().toString(), delete.pos(), table.specs().get(specId),
partition);
+ }
+ }
+
+ return Iterables.getOnlyElement(writer.result().deleteFiles());
+ }
}
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 8dbb9dd44b..02b06b70e4 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -39,13 +39,11 @@ import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.RangeReadable;
-import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.ContentFileUtil;
@@ -175,8 +173,13 @@ public class BaseDeleteLoader implements DeleteLoader {
InputFile inputFile = loadInputFile.apply(dv);
long offset = dv.contentOffset();
int length = dv.contentSizeInBytes().intValue();
- byte[] bytes = readBytes(inputFile, offset, length);
- return PositionDeleteIndex.deserialize(bytes, dv);
+ byte[] bytes = new byte[length];
+ try {
+ IOUtil.readFully(inputFile, offset, bytes, 0, length);
+ return PositionDeleteIndex.deserialize(bytes, dv);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
private PositionDeleteIndex getOrReadPosDeletes(
@@ -279,22 +282,4 @@ public class BaseDeleteLoader implements DeleteLoader {
filePath,
dv.referencedDataFile());
}
-
- private static byte[] readBytes(InputFile inputFile, long offset, int
length) {
- try (SeekableInputStream stream = inputFile.newStream()) {
- byte[] bytes = new byte[length];
-
- if (stream instanceof RangeReadable) {
- RangeReadable rangeReadable = (RangeReadable) stream;
- rangeReadable.readFully(offset, bytes);
- } else {
- stream.seek(offset);
- ByteStreams.readFully(stream, bytes);
- }
-
- return bytes;
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index c30a730917..81954c19b5 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
- deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+ deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
DataFile dataFileB = dataFile(tab, "b");
tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab,
dataFile10, 10);
- Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile10, 99);
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile99, 99);
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
// Query partition of old spec
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..4df99ca199 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_A2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-a.parquet")
+ .withPath("/path/to/data-a2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=a") // easy way to set partition data for now
.withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_B2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-b.parquet")
+ .withPath("/path/to/data-b2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=b") // easy way to set partition data for now
.withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.build();
static final DataFile FILE_C2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-c.parquet")
+ .withPath("/path/to/data-c2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=c") // easy way to set partition data for now
.withRecordCount(1)
@@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.build();
static final DataFile FILE_D2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-d.parquet")
+ .withPath("/path/to/data-d2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=d") // easy way to set partition data for now
.withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
// Add Data Files with EQ and POS deletes
DeleteFile fileADeletes = fileADeletes();
DeleteFile fileA2Deletes = fileA2Deletes();
- DeleteFile fileBDeletes = fileBDeletes();
DeleteFile fileB2Deletes = fileB2Deletes();
table
.newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.addDeletes(fileA2Deletes)
.addDeletes(FILE_A_EQ_DELETES)
.addDeletes(FILE_A2_EQ_DELETES)
- .addDeletes(fileBDeletes)
.addDeletes(fileB2Deletes)
.addDeletes(FILE_B_EQ_DELETES)
.addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2.location()),
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
Tuple2.apply(2L, FILE_C2.location()),
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 87cbbe3cea..8032b0b782 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
- deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+ deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
DataFile dataFileB = dataFile(tab, "b");
tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab,
dataFile10, 10);
- Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile10, 99);
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile99, 99);
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
// Query partition of old spec
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..1e4c21d214 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_A2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-a.parquet")
+ .withPath("/path/to/data-a2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=a") // easy way to set partition data for now
.withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_B2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-b.parquet")
+ .withPath("/path/to/data-b2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=b") // easy way to set partition data for now
.withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.build();
static final DataFile FILE_C2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-c.parquet")
+ .withPath("/path/to/data-c2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=c") // easy way to set partition data for now
.withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
// Add Data Files with EQ and POS deletes
DeleteFile fileADeletes = fileADeletes();
DeleteFile fileA2Deletes = fileA2Deletes();
- DeleteFile fileBDeletes = fileBDeletes();
DeleteFile fileB2Deletes = fileB2Deletes();
table
.newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.addDeletes(fileA2Deletes)
.addDeletes(FILE_A_EQ_DELETES)
.addDeletes(FILE_A2_EQ_DELETES)
- .addDeletes(fileBDeletes)
.addDeletes(fileB2Deletes)
.addDeletes(FILE_B_EQ_DELETES)
.addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2.location()),
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
Tuple2.apply(2L, FILE_C2.location()),
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 7892fd65b4..f5456db8e4 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
- deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+ deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
DataFile dataFileB = dataFile(tab, "b");
tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab,
dataFile10, 10);
- Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile10, 99);
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile99, 99);
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
// Query partition of old spec
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..4df99ca199 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_A2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-a.parquet")
+ .withPath("/path/to/data-a2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=a") // easy way to set partition data for now
.withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
.build();
static final DataFile FILE_B2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-b.parquet")
+ .withPath("/path/to/data-b2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=b") // easy way to set partition data for now
.withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.build();
static final DataFile FILE_C2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-c.parquet")
+ .withPath("/path/to/data-c2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=c") // easy way to set partition data for now
.withRecordCount(1)
@@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.build();
static final DataFile FILE_D2 =
DataFiles.builder(SPEC)
- .withPath("/path/to/data-d.parquet")
+ .withPath("/path/to/data-d2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=d") // easy way to set partition data for now
.withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
// Add Data Files with EQ and POS deletes
DeleteFile fileADeletes = fileADeletes();
DeleteFile fileA2Deletes = fileA2Deletes();
- DeleteFile fileBDeletes = fileBDeletes();
DeleteFile fileB2Deletes = fileB2Deletes();
table
.newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
.addDeletes(fileA2Deletes)
.addDeletes(FILE_A_EQ_DELETES)
.addDeletes(FILE_A2_EQ_DELETES)
- .addDeletes(fileBDeletes)
.addDeletes(fileB2Deletes)
.addDeletes(FILE_B_EQ_DELETES)
.addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends
TestBase {
Tuple2.apply(2L, FILE_A2.location()),
Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
Tuple2.apply(2L, fileA2Deletes.location()),
- Tuple2.apply(2L, fileBDeletes.location()),
Tuple2.apply(2L, FILE_B2.location()),
Tuple2.apply(2L, fileB2Deletes.location()),
Tuple2.apply(2L, FILE_C2.location()),
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 5641c7b2a0..0e77e70e69 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
- deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+ deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
DataFile dataFileB = dataFile(tab, "b");
tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab,
dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab,
dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
// Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends
CatalogTestBase {
tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab,
dataFile10, 10);
- Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile10, 99);
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab,
dataFile99, 99);
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
// Query partition of old spec