This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new de41011180 Core: Detect and merge duplicate DVs for a data file and 
merge them before committing (#15006)
de41011180 is described below

commit de41011180b1e5bd87a12a5177f840c8dface38e
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Sat Feb 28 18:52:32 2026 -0700

    Core: Detect and merge duplicate DVs for a data file and merge them before 
committing (#15006)
---
 core/src/main/java/org/apache/iceberg/DVUtil.java  | 206 +++++++++++
 .../apache/iceberg/MergingSnapshotProducer.java    |  84 ++++-
 .../java/org/apache/iceberg/SnapshotProducer.java  |   3 -
 .../apache/iceberg/deletes/BaseDVFileWriter.java   |  26 +-
 .../org/apache/iceberg/deletes/DVFileWriter.java   |  17 +
 .../main/java/org/apache/iceberg/io/IOUtil.java    |  25 ++
 .../test/java/org/apache/iceberg/TestRowDelta.java | 406 +++++++++++++++++++++
 .../org/apache/iceberg/data/BaseDeleteLoader.java  |  31 +-
 .../spark/source/TestPositionDeletesTable.java     |   8 +-
 .../actions/TestRemoveDanglingDeleteAction.java    |  12 +-
 .../spark/source/TestPositionDeletesTable.java     |   8 +-
 .../actions/TestRemoveDanglingDeleteAction.java    |  10 +-
 .../spark/source/TestPositionDeletesTable.java     |   8 +-
 .../actions/TestRemoveDanglingDeleteAction.java    |  12 +-
 .../spark/source/TestPositionDeletesTable.java     |   8 +-
 15 files changed, 777 insertions(+), 87 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java 
b/core/src/main/java/org/apache/iceberg/DVUtil.java
new file mode 100644
index 0000000000..c323e96775
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DVUtil.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+
+class DVUtil {
+  private DVUtil() {}
+
+  static PositionDeleteIndex readDV(DeleteFile deleteFile, FileIO fileIO) {
+    Preconditions.checkArgument(
+        ContentFileUtil.isDV(deleteFile),
+        "Cannot read, not a deletion vector: %s",
+        deleteFile.location());
+    InputFile inputFile = fileIO.newInputFile(deleteFile);
+    long offset = deleteFile.contentOffset();
+    int length = deleteFile.contentSizeInBytes().intValue();
+    byte[] bytes = new byte[length];
+    try {
+      IOUtil.readFully(inputFile, offset, bytes, 0, length);
+      return PositionDeleteIndex.deserialize(bytes, deleteFile);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * Merges duplicate DVs for the same data file and writes the merged DV 
Puffin files. If there is
+   * exactly 1 DV for a given data file then it is return as is
+   *
+   * @param dvsByReferencedFile map of data file location to DVs
+   * @param mergedOutputLocation output location of the merged DVs
+   * @param fileIO fileIO to use when reading and writing
+   * @param specs partition specs
+   * @param pool executor service for reading DVs
+   * @return a list containing both any newly merged DVs and any DVs that are 
already valid
+   */
+  static List<DeleteFile> mergeAndWriteDVsIfRequired(
+      Map<String, List<DeleteFile>> dvsByReferencedFile,
+      String mergedOutputLocation,
+      FileIO fileIO,
+      Map<Integer, PartitionSpec> specs,
+      ExecutorService pool) {
+    List<DeleteFile> finalDVs = Lists.newArrayList();
+    Multimap<String, DeleteFile> duplicates =
+        Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+    Map<String, Pair<PartitionSpec, StructLike>> partitions = 
Maps.newHashMap();
+
+    for (Map.Entry<String, List<DeleteFile>> entry : 
dvsByReferencedFile.entrySet()) {
+      if (entry.getValue().size() > 1) {
+        duplicates.putAll(entry.getKey(), entry.getValue());
+        DeleteFile first = entry.getValue().get(0);
+        partitions.put(entry.getKey(), Pair.of(specs.get(first.specId()), 
first.partition()));
+      } else {
+        finalDVs.addAll(entry.getValue());
+      }
+    }
+
+    if (duplicates.isEmpty()) {
+      return finalDVs;
+    }
+
+    validateCanMerge(duplicates, partitions);
+
+    Map<String, PositionDeleteIndex> deletes =
+        readAndMergeDVs(duplicates.values().toArray(DeleteFile[]::new), 
fileIO, pool);
+
+    finalDVs.addAll(writeDVs(deletes, fileIO, mergedOutputLocation, 
partitions));
+    return finalDVs;
+  }
+
+  private static void validateCanMerge(
+      Multimap<String, DeleteFile> duplicates,
+      Map<String, Pair<PartitionSpec, StructLike>> partitions) {
+    Map<Integer, Comparator<StructLike>> comparatorsBySpecId = 
Maps.newHashMap();
+    for (Map.Entry<String, Collection<DeleteFile>> entry : 
duplicates.asMap().entrySet()) {
+      String referencedFile = entry.getKey();
+
+      // validate that each file matches the expected partition
+      Pair<PartitionSpec, StructLike> partition = 
partitions.get(referencedFile);
+      Long sequenceNumber = Iterables.getFirst(entry.getValue(), 
null).dataSequenceNumber();
+      PartitionSpec spec = partition.first();
+      StructLike tuple = partition.second();
+      Comparator<StructLike> comparator =
+          comparatorsBySpecId.computeIfAbsent(
+              spec.specId(), id -> Comparators.forType(spec.partitionType()));
+
+      for (DeleteFile dv : entry.getValue()) {
+        Preconditions.checkArgument(
+            Objects.equals(sequenceNumber, dv.dataSequenceNumber()),
+            "Cannot merge DVs, mismatched sequence numbers (%s, %s) for %s",
+            sequenceNumber,
+            dv.dataSequenceNumber(),
+            referencedFile);
+
+        Preconditions.checkArgument(
+            spec.specId() == dv.specId(),
+            "Cannot merge DVs, mismatched partition specs (%s, %s) for %s",
+            spec.specId(),
+            dv.specId(),
+            referencedFile);
+
+        Preconditions.checkArgument(
+            comparator.compare(tuple, dv.partition()) == 0,
+            "Cannot merge DVs, mismatched partition tuples (%s, %s) for %s",
+            tuple,
+            dv.partition(),
+            referencedFile);
+      }
+    }
+  }
+
+  /**
+   * Reads all DVs, and merge the position indices per referenced data file
+   *
+   * @param duplicateDVs list of dvs to read and merge
+   * @param io the FileIO to use for reading DV files
+   * @param pool executor service for reading DVs
+   * @return map of referenced data file location to the merged position 
delete index
+   */
+  private static Map<String, PositionDeleteIndex> readAndMergeDVs(
+      DeleteFile[] duplicateDVs, FileIO io, ExecutorService pool) {
+    // Read all duplicate DVs in parallel
+    PositionDeleteIndex[] duplicatedDVPositions = new 
PositionDeleteIndex[duplicateDVs.length];
+    Tasks.range(duplicatedDVPositions.length)
+        .executeWith(pool)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(i -> duplicatedDVPositions[i] = readDV(duplicateDVs[i], io));
+
+    Map<String, PositionDeleteIndex> mergedDVs = Maps.newHashMap();
+    for (int i = 0; i < duplicatedDVPositions.length; i++) {
+      DeleteFile dv = duplicateDVs[i];
+      PositionDeleteIndex previousDV = 
mergedDVs.get(duplicateDVs[i].referencedDataFile());
+      if (previousDV != null) {
+        previousDV.merge(duplicatedDVPositions[i]);
+      } else {
+        mergedDVs.put(dv.referencedDataFile(), duplicatedDVPositions[i]);
+      }
+    }
+
+    return mergedDVs;
+  }
+
+  // Produces a single Puffin file containing the merged DVs
+  private static List<DeleteFile> writeDVs(
+      Map<String, PositionDeleteIndex> mergedIndexByFile,
+      FileIO fileIO,
+      String dvOutputLocation,
+      Map<String, Pair<PartitionSpec, StructLike>> partitions) {
+    OutputFile dvOutputFile = fileIO.newOutputFile(dvOutputLocation);
+    try (DVFileWriter dvFileWriter = new BaseDVFileWriter(() -> dvOutputFile, 
path -> null)) {
+      for (Map.Entry<String, PositionDeleteIndex> entry : 
mergedIndexByFile.entrySet()) {
+        String referencedLocation = entry.getKey();
+        PositionDeleteIndex mergedPositions = entry.getValue();
+        Pair<PartitionSpec, StructLike> partition = 
partitions.get(referencedLocation);
+        dvFileWriter.delete(
+            referencedLocation, mergedPositions, partition.first(), 
partition.second());
+      }
+      dvFileWriter.close();
+      return dvFileWriter.result().deleteFiles();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 79dcec3411..3d49da3653 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,13 +30,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Predicate;
@@ -47,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.DataFileSet;
@@ -55,6 +60,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PartitionSet;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,15 +88,17 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private final ManifestFilterManager<DataFile> filterManager;
   private final ManifestMergeManager<DeleteFile> deleteMergeManager;
   private final ManifestFilterManager<DeleteFile> deleteFilterManager;
+  private final AtomicInteger dvMergeAttempt = new AtomicInteger(0);
 
   // update data
   private final Map<Integer, DataFileSet> newDataFilesBySpec = 
Maps.newHashMap();
   private Long newDataFilesDataSequenceNumber;
-  private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = 
Maps.newHashMap();
-  private final Set<String> newDVRefs = Sets.newHashSet();
+  private final List<DeleteFile> v2Deletes = Lists.newArrayList();
+  private final Map<String, List<DeleteFile>> dvsByReferencedFile = 
Maps.newLinkedHashMap();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = 
Lists.newArrayList();
-  private final SnapshotSummary.Builder addedFilesSummary = 
SnapshotSummary.builder();
+  private final SnapshotSummary.Builder addedDataFilesSummary = 
SnapshotSummary.builder();
+  private final SnapshotSummary.Builder addedDeleteFilesSummary = 
SnapshotSummary.builder();
   private final SnapshotSummary.Builder appendedManifestsSummary = 
SnapshotSummary.builder();
   private Expression deleteExpression = Expressions.alwaysFalse();
 
@@ -222,7 +230,8 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   protected boolean addsDeleteFiles() {
-    return !newDeleteFilesBySpec.isEmpty();
+    return !v2Deletes.isEmpty()
+        || dvsByReferencedFile.values().stream().anyMatch(dvs -> 
!dvs.isEmpty());
   }
 
   /** Add a data file to the new snapshot. */
@@ -238,7 +247,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     DataFileSet dataFiles =
         newDataFilesBySpec.computeIfAbsent(spec.specId(), ignored -> 
DataFileSet.create());
     if (dataFiles.add(Delegates.suppressFirstRowId(file))) {
-      addedFilesSummary.addedFile(spec, file);
+      addedDataFilesSummary.addedFile(spec, file);
       hasNewDataFiles = true;
     }
   }
@@ -265,15 +274,14 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
         "Cannot find partition spec %s for delete file: %s",
         file.specId(),
         file.location());
-
-    DeleteFileSet deleteFiles =
-        newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> 
DeleteFileSet.create());
-    if (deleteFiles.add(file)) {
-      addedFilesSummary.addedFile(spec, file);
-      hasNewDeleteFiles = true;
-      if (ContentFileUtil.isDV(file)) {
-        newDVRefs.add(file.referencedDataFile());
-      }
+    hasNewDeleteFiles = true;
+    if (ContentFileUtil.isDV(file)) {
+      List<DeleteFile> dvsForReferencedFile =
+          dvsByReferencedFile.computeIfAbsent(
+              file.referencedDataFile(), newFile -> Lists.newArrayList());
+      dvsForReferencedFile.add(file);
+    } else {
+      v2Deletes.add(file);
     }
   }
 
@@ -814,7 +822,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
       Expression conflictDetectionFilter,
       Snapshot parent) {
     // skip if there is no current table state or this operation doesn't add 
new DVs
-    if (parent == null || newDVRefs.isEmpty()) {
+    if (parent == null || dvsByReferencedFile.isEmpty()) {
       return;
     }
 
@@ -847,7 +855,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
         DeleteFile file = entry.file();
         if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
           ValidationException.check(
-              !newDVRefs.contains(file.referencedDataFile()),
+              !dvsByReferencedFile.containsKey(file.referencedDataFile()),
               "Found concurrently added DV for %s: %s",
               file.referencedDataFile(),
               ContentFileUtil.dvDesc(file));
@@ -953,7 +961,8 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
     // update the snapshot summary
     summaryBuilder.clear();
-    summaryBuilder.merge(addedFilesSummary);
+    summaryBuilder.merge(addedDataFilesSummary);
+    summaryBuilder.merge(addedDeleteFilesSummary);
     summaryBuilder.merge(appendedManifestsSummary);
     summaryBuilder.merge(filterManager.buildSummary(filtered));
     summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
@@ -1061,7 +1070,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   private Iterable<ManifestFile> prepareDeleteManifests() {
-    if (newDeleteFilesBySpec.isEmpty()) {
+    if (!addsDeleteFiles()) {
       return ImmutableList.of();
     }
 
@@ -1076,12 +1085,22 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
       // this triggers a rewrite of all delete manifests even if there is only 
one new delete file
       // if there is a relevant use case in the future, the behavior can be 
optimized
       cachedNewDeleteManifests.clear();
+      // On cache invalidation of delete files, clear the summary because any 
new DV could require a
+      // merge,
+      // and the summary cannot be generated until after merging is complete.
+      addedDeleteFilesSummary.clear();
     }
 
     if (cachedNewDeleteManifests.isEmpty()) {
+      List<DeleteFile> mergedDVs = mergeDVs();
+      Map<Integer, List<DeleteFile>> newDeleteFilesBySpec =
+          Streams.stream(Iterables.concat(mergedDVs, 
DeleteFileSet.of(v2Deletes)))
+              .collect(Collectors.groupingBy(ContentFile::specId));
+
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops().current().spec(specId);
+            deleteFiles.forEach(file -> 
addedDeleteFilesSummary.addedFile(spec, file));
             List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
             cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
@@ -1092,6 +1111,35 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     return cachedNewDeleteManifests;
   }
 
+  private List<DeleteFile> mergeDVs() {
+    for (Map.Entry<String, List<DeleteFile>> entry : 
dvsByReferencedFile.entrySet()) {
+      if (entry.getValue().size() > 1) {
+        LOG.warn(
+            "Merging {} duplicate DVs for data file {} in table {}.",
+            entry.getValue().size(),
+            entry.getKey(),
+            tableName);
+      }
+    }
+
+    FileIO fileIO = EncryptingFileIO.combine(ops().io(), ops().encryption());
+
+    String dvOutputLocation =
+        ops()
+            .locationProvider()
+            .newDataLocation(
+                FileFormat.PUFFIN.addExtension(
+                    String.format(
+                        "merged-dvs-%s-%s", snapshotId(), 
dvMergeAttempt.incrementAndGet())));
+
+    return DVUtil.mergeAndWriteDVsIfRequired(
+        dvsByReferencedFile,
+        dvOutputLocation,
+        fileIO,
+        ops().current().specsById(),
+        ThreadPools.getDeleteWorkerPool());
+  }
+
   private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
     private DataFileFilterManager() {
       super(ops().current().specsById(), 
MergingSnapshotProducer.this::workerPool);
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index cbae25132d..ffbebf5998 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -726,9 +726,6 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
 
     try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
       for (DeleteFile file : files) {
-        Preconditions.checkArgument(
-            file instanceof Delegates.PendingDeleteFile,
-            "Invalid delete file: must be PendingDeleteFile");
         if (file.dataSequenceNumber() != null) {
           closableWriter.add(file, file.dataSequenceNumber());
         } else {
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java 
b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
index 6eabd64514..348b4d48d0 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileMetadata;
@@ -29,8 +30,8 @@ import org.apache.iceberg.IcebergBuild;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.puffin.Blob;
 import org.apache.iceberg.puffin.BlobMetadata;
@@ -51,7 +52,7 @@ public class BaseDVFileWriter implements DVFileWriter {
   private static final String REFERENCED_DATA_FILE_KEY = 
"referenced-data-file";
   private static final String CARDINALITY_KEY = "cardinality";
 
-  private final OutputFileFactory fileFactory;
+  private final Supplier<OutputFile> dvOutputFile;
   private final Function<String, PositionDeleteIndex> loadPreviousDeletes;
   private final Map<String, Deletes> deletesByPath = Maps.newHashMap();
   private final Map<String, BlobMetadata> blobsByPath = Maps.newHashMap();
@@ -59,7 +60,13 @@ public class BaseDVFileWriter implements DVFileWriter {
 
   public BaseDVFileWriter(
       OutputFileFactory fileFactory, Function<String, PositionDeleteIndex> 
loadPreviousDeletes) {
-    this.fileFactory = fileFactory;
+    this(() -> fileFactory.newOutputFile().encryptingOutputFile(), 
loadPreviousDeletes);
+  }
+
+  public BaseDVFileWriter(
+      Supplier<OutputFile> dvOutputFile,
+      Function<String, PositionDeleteIndex> loadPreviousDeletes) {
+    this.dvOutputFile = dvOutputFile;
     this.loadPreviousDeletes = loadPreviousDeletes;
   }
 
@@ -71,6 +78,17 @@ public class BaseDVFileWriter implements DVFileWriter {
     positions.delete(pos);
   }
 
+  @Override
+  public void delete(
+      String path,
+      PositionDeleteIndex positionDeleteIndex,
+      PartitionSpec spec,
+      StructLike partition) {
+    Deletes deletes =
+        deletesByPath.computeIfAbsent(path, key -> new Deletes(path, spec, 
partition));
+    deletes.positions().merge(positionDeleteIndex);
+  }
+
   @Override
   public DeleteWriteResult result() {
     Preconditions.checkState(result != null, "Cannot get result from unclosed 
writer");
@@ -148,7 +166,7 @@ public class BaseDVFileWriter implements DVFileWriter {
   }
 
   private PuffinWriter newWriter() {
-    EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+    OutputFile outputFile = dvOutputFile.get();
     return 
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java 
b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
index 2561f7be3d..88d7e02f8c 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
@@ -43,4 +43,21 @@ public interface DVFileWriter extends Closeable {
    * @return the writer result
    */
   DeleteWriteResult result();
+
+  /**
+   * Marks every position that is deleted in positionDeleteIndex as deleted in 
the given data file.
+   * Implementations should merge with existing position indices for the 
provided path
+   *
+   * @param path the data file path
+   * @param positionDeleteIndex the position delete index containing all the 
positions to delete
+   * @param spec the data file partition spec
+   * @param partition the data file partition
+   */
+  default void delete(
+      String path,
+      PositionDeleteIndex positionDeleteIndex,
+      PartitionSpec spec,
+      StructLike partition) {
+    positionDeleteIndex.forEach(position -> this.delete(path, position, spec, 
partition));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java 
b/core/src/main/java/org/apache/iceberg/io/IOUtil.java
index 37962d322d..13abcc160e 100644
--- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java
+++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java
@@ -49,6 +49,31 @@ public class IOUtil {
     }
   }
 
+  /**
+   * Reads exactly {@code length} bytes from the input file starting at {@code 
fileOffset} into the
+   * buffer. Uses range reads when supported.
+   *
+   * @param inputFile the file to read from
+   * @param fileOffset the position in the file to start reading from
+   * @param bytes a buffer to write into
+   * @param offset starting offset in the buffer for the data
+   * @param length number of bytes to read
+   * @throws IOException if there is an error while reading or if the end of 
the stream is reached
+   *     before reading length bytes
+   */
+  public static void readFully(
+      InputFile inputFile, long fileOffset, byte[] bytes, int offset, int 
length)
+      throws IOException {
+    try (SeekableInputStream stream = inputFile.newStream()) {
+      if (stream instanceof RangeReadable) {
+        ((RangeReadable) stream).readFully(fileOffset, bytes, offset, length);
+      } else {
+        stream.seek(fileOffset);
+        readFully(stream, bytes, offset, length);
+      }
+    }
+  }
+
   /** Writes a buffer into a stream, making multiple write calls if necessary. 
*/
   public static void writeFully(OutputStream outputStream, ByteBuffer buffer) 
throws IOException {
     if (!buffer.hasRemaining()) {
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java 
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 59a73ba202..749dbdc6fe 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -35,19 +35,27 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -1919,6 +1927,266 @@ public class TestRowDelta extends TestBase {
         .hasMessageContaining("Found concurrently added DV for %s", 
dataFile.location());
   }
 
+  @TestTemplate
+  public void testDuplicateDVsAreMerged() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+
+    DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+    DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+    DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8);
+    RowDelta rowDelta1 =
+        
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3);
+
+    commit(table, rowDelta1, branch);
+
+    Iterable<DeleteFile> addedDeleteFiles =
+        latestSnapshot(table, branch).addedDeleteFiles(table.io());
+    assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1);
+    DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);
+
+    assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 
8).boxed()::iterator);
+  }
+
+  @TestTemplate
+  public void testDuplicateDVsMergedMultipleSpecs() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // append a partitioned data file
+    DataFile firstSnapshotDataFile = newDataFile("data_bucket=0");
+    commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch);
+
+    // remove the only partition field to make the spec unpartitioned
+    table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+
+    // append an unpartitioned data file
+    DataFile secondSnapshotDataFile = newDataFile("");
+    commit(table, table.newAppend().appendFile(secondSnapshotDataFile), 
branch);
+
+    // evolve the spec and add a new partition field
+    table.updateSpec().addField("data").commit();
+
+    // append a data file with the new spec
+    DataFile thirdSnapshotDataFile = newDataFile("data=abc");
+    commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch);
+
+    assertThat(table.specs()).hasSize(3);
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+
+    DataFile dataFile = newDataFile("data=xyz");
+    // For each data file, create two DVs covering positions [0,2) and [2,4)
+    DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile, 
fileFactory, 0, 2);
+    DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile, 
fileFactory, 2, 4);
+    DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile, 
fileFactory, 0, 2);
+    DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile, 
fileFactory, 2, 4);
+    DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile, 
fileFactory, 0, 2);
+    DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile, 
fileFactory, 2, 4);
+
+    commit(
+        table,
+        table
+            .newRowDelta()
+            .addRows(dataFile)
+            .addDeletes(deleteFile1a)
+            .addDeletes(deleteFile1b)
+            .addDeletes(deleteFile2a)
+            .addDeletes(deleteFile2b)
+            .addDeletes(deleteFile3a)
+            .addDeletes(deleteFile3b),
+        branch);
+
+    Snapshot snapshot = latestSnapshot(table, branch);
+    // Expect 3 merged DVs, one per data file
+    Iterable<DeleteFile> addedDeleteFiles = 
snapshot.addedDeleteFiles(table.io());
+    List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+    assertThat(mergedDVs).hasSize(3);
+    // Should be a Puffin produced per merged DV spec
+    
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+        .hasSize(1);
+
+    DeleteFile committedDVForDataFile1 =
+        Iterables.getOnlyElement(
+            mergedDVs.stream()
+                .filter(
+                    dv -> Objects.equals(dv.referencedDataFile(), 
firstSnapshotDataFile.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 
4).boxed()::iterator);
+
+    DeleteFile committedDVForDataFile2 =
+        Iterables.getOnlyElement(
+            mergedDVs.stream()
+                .filter(
+                    dv ->
+                        Objects.equals(dv.referencedDataFile(), 
secondSnapshotDataFile.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 
4).boxed()::iterator);
+
+    DeleteFile committedDVForDataFile3 =
+        Iterables.getOnlyElement(
+            mergedDVs.stream()
+                .filter(
+                    dv -> Objects.equals(dv.referencedDataFile(), 
thirdSnapshotDataFile.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0, 
4).boxed()::iterator);
+  }
+
+  @TestTemplate
+  public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile1 = newDataFile("data_bucket=0");
+    DataFile dataFile2 = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), 
branch);
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+
+    // For each data file, create two DVs covering positions [0,2) and [2,4)
+    DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+    DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+    DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2);
+    DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4);
+
+    // Commit all four duplicate DVs
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(deleteFile1a)
+            .addDeletes(deleteFile1b)
+            .addDeletes(deleteFile2a)
+            .addDeletes(deleteFile2b);
+
+    commit(table, rowDelta, branch);
+
+    // Expect two merged DVs, one per data file
+    Iterable<DeleteFile> addedDeleteFiles =
+        latestSnapshot(table, branch).addedDeleteFiles(table.io());
+    List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+
+    assertThat(mergedDVs).hasSize(2);
+    // Should be a single Puffin produced
+    
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+        .hasSize(1);
+
+    DeleteFile committedDVForDataFile1 =
+        Iterables.getOnlyElement(
+            mergedDVs.stream()
+                .filter(dv -> Objects.equals(dv.referencedDataFile(), 
dataFile1.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 
4).boxed()::iterator);
+
+    DeleteFile committedDVForDataFile2 =
+        Iterables.getOnlyElement(
+            mergedDVs.stream()
+                .filter(dv -> Objects.equals(dv.referencedDataFile(), 
dataFile2.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 
4).boxed()::iterator);
+  }
+
+  @TestTemplate
+  public void testDuplicateDVsAndValidDV() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile1 = newDataFile("data_bucket=0");
+    DataFile dataFile2 = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), 
branch);
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+
+    // dataFile1 has duplicate DVs that need merging
+    DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+    DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+
+    // dataFile2 has a valid DV
+    DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3);
+
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(deleteFile1a)
+            .addDeletes(deleteFile1b)
+            .addDeletes(deleteFile2);
+
+    commit(table, rowDelta, branch);
+
+    // Expect two DVs: one merged for dataFile1 and deleteFile2
+    Iterable<DeleteFile> addedDeleteFiles =
+        latestSnapshot(table, branch).addedDeleteFiles(table.io());
+    List<DeleteFile> committedDVs = Lists.newArrayList(addedDeleteFiles);
+
+    assertThat(committedDVs).hasSize(2);
+
+    // Verify merged DV for dataFile1 has positions [0,4)
+    DeleteFile committedDVForDataFile1 =
+        Iterables.getOnlyElement(
+            committedDVs.stream()
+                .filter(dv -> Objects.equals(dv.referencedDataFile(), 
dataFile1.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 
4).boxed()::iterator);
+
+    // Verify deleteFile2 state
+    DeleteFile committedDVForDataFile2 =
+        Iterables.getOnlyElement(
+            committedDVs.stream()
+                .filter(dv -> Objects.equals(dv.referencedDataFile(), 
dataFile2.location()))
+                .collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 
3).boxed()::iterator);
+  }
+
+  @TestTemplate
+  public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+
+    // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions 
total
+    DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+    DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+
+    // One equality delete file for the same partition
+    DeleteFile eqDelete =
+        newEqualityDeleteFile(
+            table.spec().specId(),
+            "data_bucket=0",
+            table.schema().asStruct().fields().get(0).fieldId());
+
+    RowDelta rowDelta = 
table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2);
+
+    commit(table, rowDelta, branch);
+
+    Iterable<DeleteFile> addedDeleteFiles =
+        latestSnapshot(table, branch).addedDeleteFiles(table.io());
+    List<DeleteFile> committedDeletes = Lists.newArrayList(addedDeleteFiles);
+
+    // 1 DV + 1 equality delete
+    assertThat(committedDeletes).hasSize(2);
+
+    DeleteFile committedDV =
+        Iterables.getOnlyElement(
+            
committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()));
+    assertDVHasDeletedPositions(committedDV, LongStream.range(0, 
4).boxed()::iterator);
+
+    DeleteFile committedEqDelete =
+        Iterables.getOnlyElement(
+            committedDeletes.stream()
+                .filter(df -> df.content() == FileContent.EQUALITY_DELETES)
+                .collect(Collectors.toList()));
+    assertThat(committedEqDelete).isNotNull();
+    
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
+  }
+
   @TestTemplate
   public void testManifestMergingAfterUpgradeToV3() {
     assumeThat(formatVersion).isEqualTo(2);
@@ -2001,4 +2269,142 @@ public class TestRowDelta extends TestBase {
       throw new RuntimeException(e);
     }
   }
+
+  @TestTemplate
+  public void testCannotMergeDVsMismatchedSequenceNumbers() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    DeleteFile dv1 =
+        FileMetadata.deleteFileBuilder(table.spec())
+            .ofPositionDeletes()
+            .withFormat(FileFormat.PUFFIN)
+            .withPath("/tmp/dv-1.puffin")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withPartition(dataFile.partition())
+            .withReferencedDataFile(dataFile.location())
+            .withContentOffset(0)
+            .withContentSizeInBytes(10)
+            .build();
+    DeleteFile dv2 =
+        
FileMetadata.deleteFileBuilder(table.spec()).copy(dv1).withPath("/tmp/dv-2.puffin").build();
+
+    // Use protected add(DeleteFile, long) to assign different data sequence 
numbers
+    MergingSnapshotProducer<?> rowDelta = (MergingSnapshotProducer<?>) 
table.newRowDelta();
+    rowDelta.add(dv1, 1L);
+    rowDelta.add(dv2, 2L);
+
+    assertThatThrownBy(() -> commit(table, rowDelta, branch))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot merge DVs, mismatched sequence numbers");
+  }
+
+  @TestTemplate
+  public void testCannotMergeDVsMismatchedPartitionSpecs() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    // Evolve the spec so we have two distinct spec IDs
+    table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+    PartitionSpec originalSpec = table.specs().get(0);
+    PartitionSpec evolvedSpec = table.specs().get(1);
+
+    DeleteFile dv1 =
+        FileMetadata.deleteFileBuilder(originalSpec)
+            .ofPositionDeletes()
+            .withFormat(FileFormat.PUFFIN)
+            .withPath("/tmp/dv-1.puffin")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withPartition(dataFile.partition())
+            .withReferencedDataFile(dataFile.location())
+            .withContentOffset(0)
+            .withContentSizeInBytes(10)
+            .build();
+    DeleteFile dv2 =
+        
FileMetadata.deleteFileBuilder(evolvedSpec).copy(dv1).withPath("/tmp/dv-2.puffin").build();
+
+    assertThatThrownBy(
+            () -> commit(table, 
table.newRowDelta().addDeletes(dv1).addDeletes(dv2), branch))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot merge DVs, mismatched partition specs");
+  }
+
+  @TestTemplate
+  public void testCannotMergeDVsMismatchedPartitionTuples() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    DeleteFile dv1 =
+        FileMetadata.deleteFileBuilder(table.spec())
+            .ofPositionDeletes()
+            .withFormat(FileFormat.PUFFIN)
+            .withPath("/tmp/dv-1.puffin")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withPartitionPath("data_bucket=0")
+            .withReferencedDataFile(dataFile.location())
+            .withContentOffset(0)
+            .withContentSizeInBytes(10)
+            .build();
+    DeleteFile dv2 =
+        FileMetadata.deleteFileBuilder(table.spec())
+            .copy(dv1)
+            .withPath("/tmp/dv-2.puffin")
+            .withPartitionPath("data_bucket=1")
+            .build();
+
+    assertThatThrownBy(
+            () -> commit(table, 
table.newRowDelta().addDeletes(dv1).addDeletes(dv2), branch))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot merge DVs, mismatched partition tuples");
+  }
+
+  private DeleteFile dvWithPositions(
+      DataFile dataFile, OutputFileFactory fileFactory, int fromInclusive, int 
toExclusive)
+      throws IOException {
+
+    List<PositionDelete<?>> deletes = Lists.newArrayList();
+    for (int i = fromInclusive; i < toExclusive; i++) {
+      deletes.add(PositionDelete.create().set(dataFile.location(), i));
+    }
+
+    return writeDV(deletes, dataFile.specId(), dataFile.partition(), 
fileFactory);
+  }
+
+  private void assertDVHasDeletedPositions(DeleteFile dv, Iterable<Long> 
positions) {
+    assertThat(dv).isNotNull();
+    PositionDeleteIndex index = DVUtil.readDV(dv, table.io());
+    assertThat(positions)
+        .allSatisfy(
+            pos ->
+                assertThat(index.isDeleted(pos))
+                    .as("Expected position %s to be deleted", pos)
+                    .isTrue());
+  }
+
+  private DeleteFile writeDV(
+      List<PositionDelete<?>> deletes,
+      int specId,
+      StructLike partition,
+      OutputFileFactory fileFactory)
+      throws IOException {
+
+    DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
+    try (DVFileWriter closeableWriter = writer) {
+      for (PositionDelete<?> delete : deletes) {
+        closeableWriter.delete(
+            delete.path().toString(), delete.pos(), table.specs().get(specId), 
partition);
+      }
+    }
+
+    return Iterables.getOnlyElement(writer.result().deleteFiles());
+  }
 }
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 8dbb9dd44b..02b06b70e4 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -39,13 +39,11 @@ import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.IOUtil;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.RangeReadable;
-import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.CharSequenceMap;
 import org.apache.iceberg.util.ContentFileUtil;
@@ -175,8 +173,13 @@ public class BaseDeleteLoader implements DeleteLoader {
     InputFile inputFile = loadInputFile.apply(dv);
     long offset = dv.contentOffset();
     int length = dv.contentSizeInBytes().intValue();
-    byte[] bytes = readBytes(inputFile, offset, length);
-    return PositionDeleteIndex.deserialize(bytes, dv);
+    byte[] bytes = new byte[length];
+    try {
+      IOUtil.readFully(inputFile, offset, bytes, 0, length);
+      return PositionDeleteIndex.deserialize(bytes, dv);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
   }
 
   private PositionDeleteIndex getOrReadPosDeletes(
@@ -279,22 +282,4 @@ public class BaseDeleteLoader implements DeleteLoader {
         filePath,
         dv.referencedDataFile());
   }
-
-  private static byte[] readBytes(InputFile inputFile, long offset, int 
length) {
-    try (SeekableInputStream stream = inputFile.newStream()) {
-      byte[] bytes = new byte[length];
-
-      if (stream instanceof RangeReadable) {
-        RangeReadable rangeReadable = (RangeReadable) stream;
-        rangeReadable.readFully(offset, bytes);
-      } else {
-        stream.seek(offset);
-        ByteStreams.readFully(stream, bytes);
-      }
-
-      return bytes;
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index c30a730917..81954c19b5 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     // Add position deletes for both partitions
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
 
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
         deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
     Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
-        deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+        deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     DataFile dataFileB = dataFile(tab, "b");
     tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
 
     Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, 
dataFile10, 10);
-    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile10, 99);
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile99, 99);
     
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
 
     // Query partition of old spec
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..4df99ca199 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_A2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-a.parquet")
+          .withPath("/path/to/data-a2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=a") // easy way to set partition data for now
           .withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_B2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-b.parquet")
+          .withPath("/path/to/data-b2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=b") // easy way to set partition data for now
           .withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
           .build();
   static final DataFile FILE_C2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-c.parquet")
+          .withPath("/path/to/data-c2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=c") // easy way to set partition data for now
           .withRecordCount(1)
@@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
           .build();
   static final DataFile FILE_D2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-d.parquet")
+          .withPath("/path/to/data-d2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=d") // easy way to set partition data for now
           .withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
     // Add Data Files with EQ and POS deletes
     DeleteFile fileADeletes = fileADeletes();
     DeleteFile fileA2Deletes = fileA2Deletes();
-    DeleteFile fileBDeletes = fileBDeletes();
     DeleteFile fileB2Deletes = fileB2Deletes();
     table
         .newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
         .addDeletes(fileA2Deletes)
         .addDeletes(FILE_A_EQ_DELETES)
         .addDeletes(FILE_A2_EQ_DELETES)
-        .addDeletes(fileBDeletes)
         .addDeletes(fileB2Deletes)
         .addDeletes(FILE_B_EQ_DELETES)
         .addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
             Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2.location()),
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
             Tuple2.apply(2L, FILE_C2.location()),
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 87cbbe3cea..8032b0b782 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     // Add position deletes for both partitions
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
 
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
         deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
     Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
-        deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+        deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     DataFile dataFileB = dataFile(tab, "b");
     tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
 
     Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, 
dataFile10, 10);
-    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile10, 99);
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile99, 99);
     
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
 
     // Query partition of old spec
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..1e4c21d214 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_A2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-a.parquet")
+          .withPath("/path/to/data-a2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=a") // easy way to set partition data for now
           .withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_B2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-b.parquet")
+          .withPath("/path/to/data-b2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=b") // easy way to set partition data for now
           .withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
           .build();
   static final DataFile FILE_C2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-c.parquet")
+          .withPath("/path/to/data-c2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=c") // easy way to set partition data for now
           .withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
     // Add Data Files with EQ and POS deletes
     DeleteFile fileADeletes = fileADeletes();
     DeleteFile fileA2Deletes = fileA2Deletes();
-    DeleteFile fileBDeletes = fileBDeletes();
     DeleteFile fileB2Deletes = fileB2Deletes();
     table
         .newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
         .addDeletes(fileA2Deletes)
         .addDeletes(FILE_A_EQ_DELETES)
         .addDeletes(FILE_A2_EQ_DELETES)
-        .addDeletes(fileBDeletes)
         .addDeletes(fileB2Deletes)
         .addDeletes(FILE_B_EQ_DELETES)
         .addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
             Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2.location()),
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
             Tuple2.apply(2L, FILE_C2.location()),
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 7892fd65b4..f5456db8e4 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     // Add position deletes for both partitions
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
 
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
         deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
     Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
-        deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+        deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     DataFile dataFileB = dataFile(tab, "b");
     tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
 
     Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, 
dataFile10, 10);
-    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile10, 99);
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile99, 99);
     
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
 
     // Query partition of old spec
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
index 76084c2b94..4df99ca199 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_A2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-a.parquet")
+          .withPath("/path/to/data-a2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=a") // easy way to set partition data for now
           .withRecordCount(1)
@@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase {
           .build();
   static final DataFile FILE_B2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-b.parquet")
+          .withPath("/path/to/data-b2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=b") // easy way to set partition data for now
           .withRecordCount(1)
@@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
           .build();
   static final DataFile FILE_C2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-c.parquet")
+          .withPath("/path/to/data-c2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=c") // easy way to set partition data for now
           .withRecordCount(1)
@@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
           .build();
   static final DataFile FILE_D2 =
       DataFiles.builder(SPEC)
-          .withPath("/path/to/data-d.parquet")
+          .withPath("/path/to/data-d2.parquet")
           .withFileSizeInBytes(10)
           .withPartitionPath("c1=d") // easy way to set partition data for now
           .withRecordCount(1)
@@ -370,7 +370,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
     // Add Data Files with EQ and POS deletes
     DeleteFile fileADeletes = fileADeletes();
     DeleteFile fileA2Deletes = fileA2Deletes();
-    DeleteFile fileBDeletes = fileBDeletes();
     DeleteFile fileB2Deletes = fileB2Deletes();
     table
         .newRowDelta()
@@ -382,7 +381,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
         .addDeletes(fileA2Deletes)
         .addDeletes(FILE_A_EQ_DELETES)
         .addDeletes(FILE_A2_EQ_DELETES)
-        .addDeletes(fileBDeletes)
         .addDeletes(fileB2Deletes)
         .addDeletes(FILE_B_EQ_DELETES)
         .addDeletes(FILE_B2_EQ_DELETES)
@@ -400,7 +398,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
             Tuple2.apply(2L, FILE_B_EQ_DELETES.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
@@ -433,7 +430,6 @@ public class TestRemoveDanglingDeleteAction extends 
TestBase {
             Tuple2.apply(2L, FILE_A2.location()),
             Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()),
             Tuple2.apply(2L, fileA2Deletes.location()),
-            Tuple2.apply(2L, fileBDeletes.location()),
             Tuple2.apply(2L, FILE_B2.location()),
             Tuple2.apply(2L, fileB2Deletes.location()),
             Tuple2.apply(2L, FILE_C2.location()),
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 5641c7b2a0..0e77e70e69 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -409,7 +409,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     // Add position deletes for both partitions
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
 
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
@@ -455,7 +455,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
         deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
     Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
-        deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+        deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"});
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Prepare expected values
@@ -496,7 +496,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     DataFile dataFileB = dataFile(tab, "b");
     tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, 
dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, 
dataFileB, "b");
     
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     // Switch partition spec from (data) to (id)
@@ -508,7 +508,7 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
 
     Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, 
dataFile10, 10);
-    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile10, 99);
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, 
dataFile99, 99);
     
tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
 
     // Query partition of old spec

Reply via email to