This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 061ae58986 Core: Keep track of data files to be removed for orphaned 
DV detection (#13222)
061ae58986 is described below

commit 061ae58986db3495ff3af6f1932a96dd086e5fbd
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 8 09:04:01 2025 +0200

    Core: Keep track of data files to be removed for orphaned DV detection 
(#13222)
---
 .../apache/iceberg/RewriteDataFilesBenchmark.java  | 196 +++++++++++++++++++++
 .../org/apache/iceberg/ManifestFilterManager.java  |  36 +++-
 .../apache/iceberg/MergingSnapshotProducer.java    |  11 ++
 .../java/org/apache/iceberg/TestDeleteFiles.java   |  92 ++++++++++
 .../org/apache/iceberg/TestReplacePartitions.java  |  52 +++++-
 .../java/org/apache/iceberg/TestRewriteFiles.java  |  44 ++++-
 .../test/java/org/apache/iceberg/TestRowDelta.java |  34 ++--
 .../maintenance/api/TestRewriteDataFiles.java      |   3 +-
 .../maintenance/api/TestRewriteDataFiles.java      |   3 +-
 .../maintenance/api/TestRewriteDataFiles.java      |   3 +-
 10 files changed, 440 insertions(+), 34 deletions(-)

diff --git 
a/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java
new file mode 100644
index 0000000000..b56521548b
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DataFileSet;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of rewriting data files in the 
table.
+ *
+ * <p>To run this benchmark: <code>
+ *   ./gradlew :iceberg-core:jmh
+ *       -PjmhIncludeRegex=RewriteDataFilesBenchmark
+ *       -PjmhOutputPath=benchmark/rewrite-data-files-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class RewriteDataFilesBenchmark {
+
+  private static final String TABLE_IDENT = "tblX";
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "int_col", Types.IntegerType.get()),
+          required(2, "long_col", Types.LongType.get()),
+          required(3, "decimal_col", Types.DecimalType.of(10, 10)),
+          required(4, "date_col", Types.DateType.get()),
+          required(5, "timestamp_col", Types.TimestampType.withoutZone()),
+          required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
+          required(7, "str_col", Types.StringType.get()));
+  private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private Table table;
+  private DataFileSet dataFilesToRemove;
+  private DataFileSet dataFilesToAdd;
+
+  @Param({"50000", "100000", "500000", "1000000", "2000000"})
+  private int numFiles;
+
+  @Param({"5", "25", "50", "100"})
+  private int percentDataFilesRewritten;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    initTable();
+    initFiles();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    dropTable();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void rewriteDataFiles() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+    RewriteFiles rewriteFiles = table.newRewrite();
+    rewriteFiles.validateFromSnapshot(currentSnapshot.snapshotId());
+    dataFilesToAdd.forEach(rewriteFiles::addFile);
+    dataFilesToRemove.forEach(rewriteFiles::deleteFile);
+    rewriteFiles.commit();
+    table.manageSnapshots().rollbackTo(currentSnapshot.snapshotId()).commit();
+  }
+
+  private void initTable() {
+    if (TABLES.exists(TABLE_IDENT)) {
+      TABLES.dropTable(TABLE_IDENT);
+    }
+
+    this.table =
+        TABLES.create(
+            SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, 
"3"), TABLE_IDENT);
+  }
+
+  private void dropTable() {
+    TABLES.dropTable(TABLE_IDENT);
+  }
+
+  private void initFiles() throws IOException {
+    List<DataFile> pendingDataFiles = 
Lists.newArrayListWithExpectedSize(numFiles);
+    int numDataFilesToRewrite = (int) Math.ceil(numFiles * 
(percentDataFilesRewritten / 100.0));
+    Map<String, DataFile> filesToReplace = 
Maps.newHashMapWithExpectedSize(numDataFilesToRewrite);
+    RowDelta rowDelta = table.newRowDelta();
+    for (int ordinal = 0; ordinal < numFiles; ordinal++) {
+      DataFile dataFile = generateDataFile();
+      rowDelta.addRows(dataFile);
+      DeleteFile deleteFile = FileGenerationUtil.generateDV(table, dataFile);
+      rowDelta.addDeletes(deleteFile);
+      if (numDataFilesToRewrite > 0) {
+        filesToReplace.put(dataFile.location(), dataFile);
+        DataFile pendingDataFile = generateDataFile(dataFile.recordCount());
+        rowDelta.addRows(pendingDataFile);
+        pendingDataFiles.add(pendingDataFile);
+        numDataFilesToRewrite--;
+      }
+    }
+
+    rowDelta.commit();
+
+    List<DataFile> dataFilesReadFromManifests = Lists.newArrayList();
+    for (ManifestFile dataManifest : 
table.currentSnapshot().dataManifests(table.io())) {
+      try (ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(dataManifest, table.io())) {
+        manifestReader
+            .iterator()
+            .forEachRemaining(
+                file -> {
+                  if (filesToReplace.containsKey(file.location())) {
+                    dataFilesReadFromManifests.add(file);
+                  }
+                });
+      }
+    }
+
+    this.dataFilesToRemove = DataFileSet.of(dataFilesReadFromManifests);
+    this.dataFilesToAdd = DataFileSet.of(pendingDataFiles);
+  }
+
+  private DataFile generateDataFile() {
+    return generateDataFile(-1L);
+  }
+
+  private DataFile generateDataFile(long recordCount) {
+    Schema schema = table.schema();
+    PartitionSpec spec = table.spec();
+    LocationProvider locations = table.locationProvider();
+    String path = locations.newDataLocation(spec, null, 
FileGenerationUtil.generateFileName());
+    long fileSize = ThreadLocalRandom.current().nextLong(50_000L);
+    MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+    Metrics metrics =
+        FileGenerationUtil.generateRandomMetrics(
+            schema, metricsConfig, ImmutableMap.of(), ImmutableMap.of());
+    if (recordCount > 0) {
+      metrics =
+          new Metrics(
+              recordCount,
+              metrics.columnSizes(),
+              metrics.valueCounts(),
+              metrics.nullValueCounts(),
+              metrics.nanValueCounts());
+    }
+
+    return DataFiles.builder(spec)
+        .withPath(path)
+        .withFileSizeInBytes(fileSize)
+        .withFormat(FileFormat.PARQUET)
+        .withMetrics(metrics)
+        .build();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java 
b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
index 840d93ec10..9b5dce4467 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -42,6 +42,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.ManifestFileUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PartitionSet;
@@ -80,6 +81,9 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
   private int duplicateDeleteCount = 0;
   private boolean caseSensitive = true;
   private boolean allDeletesReferenceManifests = true;
+  // this is only being used for the DeleteManifestFilterManager to detect 
orphaned DVs for removed
+  // data file paths
+  private Set<String> removedDataFilePaths = Sets.newHashSet();
 
   // cache filtered manifests to avoid extra work when commits fail.
   private final Map<ManifestFile, ManifestFile> filteredManifests = 
Maps.newConcurrentMap();
@@ -114,6 +118,10 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
     this.failMissingDeletePaths = true;
   }
 
+  protected Set<F> filesToBeDeleted() {
+    return deleteFiles;
+  }
+
   /**
    * Add a filter to match files to delete. A file will be deleted if all of 
the rows it contains
    * match this or any other filter passed to this method.
@@ -155,6 +163,11 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
     this.caseSensitive = newCaseSensitive;
   }
 
+  protected void removeDanglingDeletesFor(Set<DataFile> dataFiles) {
+    this.removedDataFilePaths =
+        
dataFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
+  }
+
   /** Add a specific path to be deleted in the new snapshot. */
   void delete(F file) {
     Preconditions.checkNotNull(file, "Cannot delete file: null");
@@ -224,7 +237,9 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
   private boolean canTrustManifestReferences(List<ManifestFile> manifests) {
     Set<String> manifestLocations =
         manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
-    return allDeletesReferenceManifests && 
manifestLocations.containsAll(manifestsWithDeletes);
+    return allDeletesReferenceManifests
+        && !manifestsWithDeletes.isEmpty()
+        && manifestLocations.containsAll(manifestsWithDeletes);
   }
 
   /**
@@ -404,6 +419,8 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
       return true;
     } else if (!deleteFiles.isEmpty()) {
       return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, 
specsById);
+    } else if (!removedDataFilePaths.isEmpty()) {
+      return true;
     }
 
     return false;
@@ -427,7 +444,8 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
               || (isDelete
                   && entry.isLive()
                   && entry.dataSequenceNumber() > 0
-                  && entry.dataSequenceNumber() < minSequenceNumber);
+                  && entry.dataSequenceNumber() < minSequenceNumber)
+              || (isDelete && isDanglingDV((DeleteFile) file));
 
       if (markedForDelete || evaluator.rowsMightMatch(file)) {
         boolean allRowsMatch = markedForDelete || 
evaluator.rowsMustMatch(file);
@@ -452,6 +470,10 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
     return false;
   }
 
+  private boolean isDanglingDV(DeleteFile file) {
+    return ContentFileUtil.isDV(file) && 
removedDataFilePaths.contains(file.referencedDataFile());
+  }
+
   @SuppressWarnings({"CollectionUndefinedEquality", 
"checkstyle:CyclomaticComplexity"})
   private ManifestFile filterManifestWithDeletedFiles(
       PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, 
ManifestReader<F> reader) {
@@ -468,8 +490,10 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
             .forEach(
                 entry -> {
                   F file = entry.file();
+                  boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile) 
file);
                   boolean markedForDelete =
-                      deletePaths.contains(file.location())
+                      isDanglingDV
+                          || deletePaths.contains(file.location())
                           || deleteFiles.contains(file)
                           || dropPartitions.contains(file.specId(), 
file.partition())
                           || (isDelete
@@ -488,6 +512,10 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
 
                     if (allRowsMatch) {
                       writer.delete(entry);
+                      F fileCopy = file.copyWithoutStats();
+                      // add the file here in case it was deleted using an 
expression. The
+                      // DeleteManifestFilterManager will then remove its 
matching DV
+                      deleteFiles.add(fileCopy);
 
                       if (deletedFiles.contains(file)) {
                         LOG.warn(
@@ -498,7 +526,7 @@ abstract class ManifestFilterManager<F extends 
ContentFile<F>> {
                       } else {
                         // only add the file to deletes if it is a new delete
                         // this keeps the snapshot summary accurate for 
non-duplicate data
-                        deletedFiles.add(file.copyWithoutStats());
+                        deletedFiles.add(fileCopy);
                       }
                     } else {
                       writer.existing(entry);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index cae11dce55..9ed2f4f4c0 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -935,6 +935,12 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
                             .UNASSIGNED_SEQ) // filter out unassigned in 
rewritten manifests
             .reduce(base.lastSequenceNumber(), Math::min);
     deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
+
+    // retrieve the data files to be deleted from the DataFileFilterManager 
and pass it to the
+    // DeleteFileFilterManager so that it can potentially remove orphaned DVs
+    Set<DataFile> filesToBeDeleted = filterManager.filesToBeDeleted();
+    deleteFilterManager.removeDanglingDeletesFor(filesToBeDeleted);
+
     List<ManifestFile> filteredDeletes =
         deleteFilterManager.filterManifests(
             SnapshotUtil.schemaFor(base, targetBranch()),
@@ -1130,6 +1136,11 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     protected Set<DataFile> newFileSet() {
       return DataFileSet.create();
     }
+
+    @Override
+    protected void removeDanglingDeletesFor(Set<DataFile> dataFiles) {
+      throw new UnsupportedOperationException("Cannot remove dangling 
deletes");
+    }
   }
 
   private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 8e4ffed5b8..cad294f97a 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -467,6 +467,98 @@ public class TestDeleteFiles extends TestBase {
         .hasMessage("Referenced data file is required for DV");
   }
 
+  @TestTemplate
+  public void removingDataFileByExpressionAlsoRemovesDV() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    DeleteFile dv1 =
+        FileMetadata.deleteFileBuilder(SPEC)
+            .ofPositionDeletes()
+            .withPath("/path/to/data-1-deletes.puffin")
+            .withFileSizeInBytes(10)
+            .withPartitionPath("data_bucket=0")
+            .withRecordCount(5)
+            .withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_0_2.location())
+            .withContentOffset(4)
+            .withContentSizeInBytes(6)
+            .build();
+
+    DeleteFile dv2 =
+        FileMetadata.deleteFileBuilder(SPEC)
+            .ofPositionDeletes()
+            .withPath("/path/to/data-2-deletes.puffin")
+            .withFileSizeInBytes(10)
+            .withPartitionPath("data_bucket=0")
+            .withRecordCount(5)
+            .withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_8_10.location())
+            .withContentOffset(4)
+            .withContentSizeInBytes(6)
+            .build();
+
+    commit(
+        table,
+        table
+            .newRowDelta()
+            .addRows(DATA_FILE_BUCKET_0_IDS_0_2)
+            .addRows(DATA_FILE_BUCKET_0_IDS_8_10)
+            .addDeletes(dv1)
+            .addDeletes(dv2),
+        branch);
+
+    Snapshot snapshot = latestSnapshot(table, branch);
+    assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+    // deleting by row filter should also remove the orphaned dv1 from delete 
manifests
+    commit(table, 
table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 5)), branch);
+
+    Snapshot deleteSnap = latestSnapshot(table, branch);
+    assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+    assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+    validateDeleteManifest(
+        deleteSnap.deleteManifests(table.io()).get(0),
+        dataSeqs(1L, 1L),
+        fileSeqs(1L, 1L),
+        ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+        files(dv1, dv2),
+        statuses(ManifestEntry.Status.DELETED, Status.EXISTING));
+  }
+
+  @TestTemplate
+  public void removingDataFileByPathAlsoRemovesDV() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    commit(
+        table,
+        table
+            .newRowDelta()
+            .addRows(FILE_A)
+            .addRows(FILE_B)
+            .addDeletes(fileADeletes())
+            .addDeletes(fileBDeletes()),
+        branch);
+
+    Snapshot snapshot = latestSnapshot(table, branch);
+    assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+    // deleting by path should also remove the orphaned DV for fileA from 
delete manifests
+    commit(table, table.newDelete().deleteFile(FILE_A.location()), branch);
+
+    Snapshot deleteSnap = latestSnapshot(table, branch);
+    assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+    assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+    validateDeleteManifest(
+        deleteSnap.deleteManifests(table.io()).get(0),
+        dataSeqs(1L, 1L),
+        fileSeqs(1L, 1L),
+        ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+        files(fileADeletes(), fileBDeletes()),
+        statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+  }
+
   private static ByteBuffer longToBuffer(long value) {
     return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, 
value);
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java 
b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
index 24a9593d6e..29daeb995c 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
@@ -24,8 +24,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.junit.jupiter.api.TestTemplate;
@@ -106,11 +107,9 @@ public class TestReplacePartitions extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, "main"},
-        new Object[] {1, "testBranch"},
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"branch"}))
+        .collect(Collectors.toList());
   }
 
   @TestTemplate
@@ -826,4 +825,45 @@ public class TestReplacePartitions extends TestBase {
   public void testEmptyPartitionPathWithUnpartitionedTable() {
     DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath("");
   }
+
+  @TestTemplate
+  public void replacingAndMergingOnePartitionAlsoRemovesDV() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    // ensure the overwrite results in a merge
+    table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"1").commit();
+
+    commit(
+        table,
+        table
+            .newRowDelta()
+            .addRows(FILE_A)
+            .addRows(FILE_B)
+            .addDeletes(fileADeletes())
+            .addDeletes(fileBDeletes()),
+        branch);
+
+    Snapshot snapshot = latestSnapshot(table, branch);
+
+    // FILE_E has the same partition as FILE_A. The dv for FILE_A will be 
removed from the delete
+    // manifest
+    commit(table, table.newReplacePartitions().addFile(FILE_E), branch);
+
+    Snapshot replaceSnapshot = latestSnapshot(table, branch);
+    assertThat(replaceSnapshot.dataManifests(table.io())).hasSize(1);
+    assertThat(replaceSnapshot.deleteManifests(table.io())).hasSize(1);
+
+    validateManifestEntries(
+        replaceSnapshot.dataManifests(table.io()).get(0),
+        ids(replaceSnapshot.snapshotId(), replaceSnapshot.snapshotId(), 
snapshot.snapshotId()),
+        files(FILE_E, FILE_A, FILE_B),
+        statuses(Status.ADDED, Status.DELETED, Status.EXISTING));
+
+    validateDeleteManifest(
+        replaceSnapshot.deleteManifests(table.io()).get(0),
+        dataSeqs(1L, 1L),
+        fileSeqs(1L, 1L),
+        ids(replaceSnapshot.snapshotId(), snapshot.snapshotId()),
+        files(fileADeletes(), fileBDeletes()),
+        statuses(Status.DELETED, Status.EXISTING));
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
index 96427576b4..701fdc97f2 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
@@ -379,12 +379,12 @@ public class TestRewriteFiles extends TestBase {
         pending.allManifests(table.io()).get(2),
         dataSeqs(1L, 1L),
         fileSeqs(1L, 1L),
-        ids(baseSnapshotId, baseSnapshotId),
+        ids(formatVersion >= 3 ? pendingId : baseSnapshotId, baseSnapshotId),
         files(fileADeletes(), fileBDeletes()),
-        statuses(ADDED, ADDED));
+        statuses(formatVersion >= 3 ? DELETED : ADDED, formatVersion >= 3 ? 
EXISTING : ADDED));
 
-    // We should only get the 4 manifests that this test is expected to add.
-    assertThat(listManifestFiles()).hasSize(4);
+    // We should only get the 4 (5 for v3) manifests that this test is 
expected to add.
+    assertThat(listManifestFiles()).hasSize(formatVersion >= 3 ? 5 : 4);
   }
 
   @TestTemplate
@@ -777,4 +777,40 @@ public class TestRewriteFiles extends TestBase {
             .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)),
         branch);
   }
+
+  @TestTemplate
+  public void removingDataFileAlsoRemovesDV() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    commit(
+        table,
+        table
+            .newRowDelta()
+            .addRows(FILE_A)
+            .addRows(FILE_B)
+            .addDeletes(fileADeletes())
+            .addDeletes(fileBDeletes()),
+        branch);
+
+    Snapshot snapshot = latestSnapshot(table, branch);
+    assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+    commit(
+        table,
+        
table.newRewrite().validateFromSnapshot(snapshot.snapshotId()).deleteFile(FILE_A),
+        branch);
+
+    Snapshot deleteSnap = latestSnapshot(table, branch);
+    assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+    assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+    assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+    validateDeleteManifest(
+        deleteSnap.deleteManifests(table.io()).get(0),
+        dataSeqs(1L, 1L),
+        fileSeqs(1L, 1L),
+        ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+        files(fileADeletes(), fileBDeletes()),
+        statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java 
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 744281e8d8..4d1d11081d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -32,6 +32,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -727,7 +728,7 @@ public class TestRowDelta extends TestBase {
     assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1);
     assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
 
-    // deleting a specific data file will not affect a delete file
+    // deleting a specific data file will not affect a delete file in v2 or 
less
     commit(table, table.newDelete().deleteFile(FILE_A), branch);
 
     Snapshot deleteSnap = latestSnapshot(table, branch);
@@ -743,21 +744,26 @@ public class TestRowDelta extends TestBase {
         files(FILE_A),
         statuses(Status.DELETED));
 
-    assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+    Iterator<Long> ids =
+        formatVersion >= 3
+            ? ids(deleteSnap.snapshotId(), deltaSnapshotId)
+            : ids(deltaSnapshotId, deltaSnapshotId);
+    Iterator<Status> statuses =
+        formatVersion >= 3
+            ? statuses(Status.DELETED, Status.EXISTING)
+            : statuses(Status.ADDED, Status.ADDED);
     validateDeleteManifest(
         deleteSnap.deleteManifests(table.io()).get(0),
         dataSeqs(1L, 1L),
         fileSeqs(1L, 1L),
-        ids(deltaSnapshotId, deltaSnapshotId),
+        ids,
         files(fileADeletes(), fileBDeletes()),
-        statuses(Status.ADDED, Status.ADDED));
+        statuses);
 
     // the manifest that removed FILE_A will be dropped next commit, causing 
the min sequence number
-    // of all data files
-    // to be 2, the largest known sequence number. this will cause 
FILE_A_DELETES to be removed
-    // because it is too old
-    // to apply to any data files.
-    commit(table, table.newRowDelta().removeDeletes(FILE_B_DELETES), branch);
+    // of all data files to be 2, the largest known sequence number. This will 
cause FILE_A_DELETES
+    // to be removed because it is too old to apply to any data files.
+    commit(table, table.newRowDelta().removeDeletes(fileBDeletes()), branch);
 
     Snapshot nextSnap = latestSnapshot(table, branch);
     assertThat(nextSnap.sequenceNumber()).isEqualTo(3);
@@ -770,7 +776,7 @@ public class TestRowDelta extends TestBase {
         dataSeqs(1L, 1L),
         fileSeqs(1L, 1L),
         ids(nextSnap.snapshotId(), nextSnap.snapshotId()),
-        files(fileADeletes(), fileBDeletes()),
+        formatVersion >= 3 ? files(fileBDeletes()) : files(fileADeletes(), 
fileBDeletes()),
         statuses(Status.DELETED, Status.DELETED));
   }
 
@@ -803,9 +809,9 @@ public class TestRowDelta extends TestBase {
         deleteSnap.deleteManifests(table.io()).get(0),
         dataSeqs(1L),
         fileSeqs(1L),
-        ids(deltaSnapshotId),
+        ids(formatVersion >= 3 ? deleteSnap.snapshotId() : deltaSnapshotId),
         files(fileADeletes()),
-        statuses(Status.ADDED));
+        statuses(formatVersion >= 3 ? Status.DELETED : Status.ADDED));
 
     // the manifest that removed FILE_A will be dropped next merging commit, 
but FastAppend will not
     // remove it
@@ -839,9 +845,9 @@ public class TestRowDelta extends TestBase {
         nextSnap.deleteManifests(table.io()).get(0),
         dataSeqs(1L),
         fileSeqs(1L),
-        ids(deltaSnapshotId),
+        ids(formatVersion >= 3 ? deleteSnap.snapshotId() : deltaSnapshotId),
         files(fileADeletes()),
-        statuses(Status.ADDED));
+        statuses(formatVersion >= 3 ? Status.DELETED : Status.ADDED));
   }
 
   @TestTemplate
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
 
     runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
 
-    // After #11131 we don't remove the delete files
-    assertFileNum(table, 1, 3);
+    assertFileNum(table, 1, 1);
 
     SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, 
"c")));
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
 
     runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
 
-    // After #11131 we don't remove the delete files
-    assertFileNum(table, 1, 3);
+    assertFileNum(table, 1, 1);
 
     SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, 
"c")));
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
 
     runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
 
-    // After #11131 we don't remove the delete files
-    assertFileNum(table, 1, 3);
+    assertFileNum(table, 1, 1);
 
     SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, 
"c")));
 

Reply via email to