This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 061ae58986 Core: Keep track of data files to be removed for orphaned
DV detection (#13222)
061ae58986 is described below
commit 061ae58986db3495ff3af6f1932a96dd086e5fbd
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 8 09:04:01 2025 +0200
Core: Keep track of data files to be removed for orphaned DV detection
(#13222)
---
.../apache/iceberg/RewriteDataFilesBenchmark.java | 196 +++++++++++++++++++++
.../org/apache/iceberg/ManifestFilterManager.java | 36 +++-
.../apache/iceberg/MergingSnapshotProducer.java | 11 ++
.../java/org/apache/iceberg/TestDeleteFiles.java | 92 ++++++++++
.../org/apache/iceberg/TestReplacePartitions.java | 52 +++++-
.../java/org/apache/iceberg/TestRewriteFiles.java | 44 ++++-
.../test/java/org/apache/iceberg/TestRowDelta.java | 34 ++--
.../maintenance/api/TestRewriteDataFiles.java | 3 +-
.../maintenance/api/TestRewriteDataFiles.java | 3 +-
.../maintenance/api/TestRewriteDataFiles.java | 3 +-
10 files changed, 440 insertions(+), 34 deletions(-)
diff --git
a/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java
new file mode 100644
index 0000000000..b56521548b
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/RewriteDataFilesBenchmark.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DataFileSet;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of rewriting data files in the
table.
+ *
+ * <p>To run this benchmark: <code>
+ * ./gradlew :iceberg-core:jmh
+ * -PjmhIncludeRegex=RewriteDataFilesBenchmark
+ * -PjmhOutputPath=benchmark/rewrite-data-files-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class RewriteDataFilesBenchmark {
+
+ private static final String TABLE_IDENT = "tblX";
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "int_col", Types.IntegerType.get()),
+ required(2, "long_col", Types.LongType.get()),
+ required(3, "decimal_col", Types.DecimalType.of(10, 10)),
+ required(4, "date_col", Types.DateType.get()),
+ required(5, "timestamp_col", Types.TimestampType.withoutZone()),
+ required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
+ required(7, "str_col", Types.StringType.get()));
+ private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
+ private static final HadoopTables TABLES = new HadoopTables();
+
+ private Table table;
+ private DataFileSet dataFilesToRemove;
+ private DataFileSet dataFilesToAdd;
+
+ @Param({"50000", "100000", "500000", "1000000", "2000000"})
+ private int numFiles;
+
+ @Param({"5", "25", "50", "100"})
+ private int percentDataFilesRewritten;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ initTable();
+ initFiles();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ dropTable();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void rewriteDataFiles() {
+ Snapshot currentSnapshot = table.currentSnapshot();
+ RewriteFiles rewriteFiles = table.newRewrite();
+ rewriteFiles.validateFromSnapshot(currentSnapshot.snapshotId());
+ dataFilesToAdd.forEach(rewriteFiles::addFile);
+ dataFilesToRemove.forEach(rewriteFiles::deleteFile);
+ rewriteFiles.commit();
+ table.manageSnapshots().rollbackTo(currentSnapshot.snapshotId()).commit();
+ }
+
+ private void initTable() {
+ if (TABLES.exists(TABLE_IDENT)) {
+ TABLES.dropTable(TABLE_IDENT);
+ }
+
+ this.table =
+ TABLES.create(
+ SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION,
"3"), TABLE_IDENT);
+ }
+
+ private void dropTable() {
+ TABLES.dropTable(TABLE_IDENT);
+ }
+
+ private void initFiles() throws IOException {
+ List<DataFile> pendingDataFiles =
Lists.newArrayListWithExpectedSize(numFiles);
+ int numDataFilesToRewrite = (int) Math.ceil(numFiles *
(percentDataFilesRewritten / 100.0));
+ Map<String, DataFile> filesToReplace =
Maps.newHashMapWithExpectedSize(numDataFilesToRewrite);
+ RowDelta rowDelta = table.newRowDelta();
+ for (int ordinal = 0; ordinal < numFiles; ordinal++) {
+ DataFile dataFile = generateDataFile();
+ rowDelta.addRows(dataFile);
+ DeleteFile deleteFile = FileGenerationUtil.generateDV(table, dataFile);
+ rowDelta.addDeletes(deleteFile);
+ if (numDataFilesToRewrite > 0) {
+ filesToReplace.put(dataFile.location(), dataFile);
+ DataFile pendingDataFile = generateDataFile(dataFile.recordCount());
+ rowDelta.addRows(pendingDataFile);
+ pendingDataFiles.add(pendingDataFile);
+ numDataFilesToRewrite--;
+ }
+ }
+
+ rowDelta.commit();
+
+ List<DataFile> dataFilesReadFromManifests = Lists.newArrayList();
+ for (ManifestFile dataManifest :
table.currentSnapshot().dataManifests(table.io())) {
+ try (ManifestReader<DataFile> manifestReader =
ManifestFiles.read(dataManifest, table.io())) {
+ manifestReader
+ .iterator()
+ .forEachRemaining(
+ file -> {
+ if (filesToReplace.containsKey(file.location())) {
+ dataFilesReadFromManifests.add(file);
+ }
+ });
+ }
+ }
+
+ this.dataFilesToRemove = DataFileSet.of(dataFilesReadFromManifests);
+ this.dataFilesToAdd = DataFileSet.of(pendingDataFiles);
+ }
+
+ private DataFile generateDataFile() {
+ return generateDataFile(-1L);
+ }
+
+ private DataFile generateDataFile(long recordCount) {
+ Schema schema = table.schema();
+ PartitionSpec spec = table.spec();
+ LocationProvider locations = table.locationProvider();
+ String path = locations.newDataLocation(spec, null,
FileGenerationUtil.generateFileName());
+ long fileSize = ThreadLocalRandom.current().nextLong(50_000L);
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Metrics metrics =
+ FileGenerationUtil.generateRandomMetrics(
+ schema, metricsConfig, ImmutableMap.of(), ImmutableMap.of());
+ if (recordCount > 0) {
+ metrics =
+ new Metrics(
+ recordCount,
+ metrics.columnSizes(),
+ metrics.valueCounts(),
+ metrics.nullValueCounts(),
+ metrics.nanValueCounts());
+ }
+
+ return DataFiles.builder(spec)
+ .withPath(path)
+ .withFileSizeInBytes(fileSize)
+ .withFormat(FileFormat.PARQUET)
+ .withMetrics(metrics)
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
index 840d93ec10..9b5dce4467 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -42,6 +42,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
@@ -80,6 +81,9 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
+ // this is only being used for the DeleteManifestFilterManager to detect
orphaned DVs for removed
+ // data file paths
+ private Set<String> removedDataFilePaths = Sets.newHashSet();
// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests =
Maps.newConcurrentMap();
@@ -114,6 +118,10 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
this.failMissingDeletePaths = true;
}
+ protected Set<F> filesToBeDeleted() {
+ return deleteFiles;
+ }
+
/**
* Add a filter to match files to delete. A file will be deleted if all of
the rows it contains
* match this or any other filter passed to this method.
@@ -155,6 +163,11 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
this.caseSensitive = newCaseSensitive;
}
+ protected void removeDanglingDeletesFor(Set<DataFile> dataFiles) {
+ this.removedDataFilePaths =
+
dataFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
+ }
+
/** Add a specific path to be deleted in the new snapshot. */
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
@@ -224,7 +237,9 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
private boolean canTrustManifestReferences(List<ManifestFile> manifests) {
Set<String> manifestLocations =
manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
- return allDeletesReferenceManifests &&
manifestLocations.containsAll(manifestsWithDeletes);
+ return allDeletesReferenceManifests
+ && !manifestsWithDeletes.isEmpty()
+ && manifestLocations.containsAll(manifestsWithDeletes);
}
/**
@@ -404,6 +419,8 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
return true;
} else if (!deleteFiles.isEmpty()) {
return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions,
specsById);
+ } else if (!removedDataFilePaths.isEmpty()) {
+ return true;
}
return false;
@@ -427,7 +444,8 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
- && entry.dataSequenceNumber() < minSequenceNumber);
+ && entry.dataSequenceNumber() < minSequenceNumber)
+ || (isDelete && isDanglingDV((DeleteFile) file));
if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete ||
evaluator.rowsMustMatch(file);
@@ -452,6 +470,10 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
return false;
}
+ private boolean isDanglingDV(DeleteFile file) {
+ return ContentFileUtil.isDV(file) &&
removedDataFilePaths.contains(file.referencedDataFile());
+ }
+
@SuppressWarnings({"CollectionUndefinedEquality",
"checkstyle:CyclomaticComplexity"})
private ManifestFile filterManifestWithDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestFile manifest,
ManifestReader<F> reader) {
@@ -468,8 +490,10 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
.forEach(
entry -> {
F file = entry.file();
+ boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile)
file);
boolean markedForDelete =
- deletePaths.contains(file.location())
+ isDanglingDV
+ || deletePaths.contains(file.location())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(),
file.partition())
|| (isDelete
@@ -488,6 +512,10 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
if (allRowsMatch) {
writer.delete(entry);
+ F fileCopy = file.copyWithoutStats();
+ // add the file here in case it was deleted using an
expression. The
+ // DeleteManifestFilterManager will then remove its
matching DV
+ deleteFiles.add(fileCopy);
if (deletedFiles.contains(file)) {
LOG.warn(
@@ -498,7 +526,7 @@ abstract class ManifestFilterManager<F extends
ContentFile<F>> {
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for
non-duplicate data
- deletedFiles.add(file.copyWithoutStats());
+ deletedFiles.add(fileCopy);
}
} else {
writer.existing(entry);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index cae11dce55..9ed2f4f4c0 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -935,6 +935,12 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
.UNASSIGNED_SEQ) // filter out unassigned in
rewritten manifests
.reduce(base.lastSequenceNumber(), Math::min);
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
+
+ // retrieve the data files to be deleted from the DataFileFilterManager
and pass it to the
+ // DeleteFileFilterManager so that it can potentially remove orphaned DVs
+ Set<DataFile> filesToBeDeleted = filterManager.filesToBeDeleted();
+ deleteFilterManager.removeDanglingDeletesFor(filesToBeDeleted);
+
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
SnapshotUtil.schemaFor(base, targetBranch()),
@@ -1130,6 +1136,11 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
protected Set<DataFile> newFileSet() {
return DataFileSet.create();
}
+
+ @Override
+ protected void removeDanglingDeletesFor(Set<DataFile> dataFiles) {
+ throw new UnsupportedOperationException("Cannot remove dangling
deletes");
+ }
}
private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 8e4ffed5b8..cad294f97a 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -467,6 +467,98 @@ public class TestDeleteFiles extends TestBase {
.hasMessage("Referenced data file is required for DV");
}
+ @TestTemplate
+ public void removingDataFileByExpressionAlsoRemovesDV() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ DeleteFile dv1 =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-1-deletes.puffin")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data_bucket=0")
+ .withRecordCount(5)
+ .withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_0_2.location())
+ .withContentOffset(4)
+ .withContentSizeInBytes(6)
+ .build();
+
+ DeleteFile dv2 =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-2-deletes.puffin")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data_bucket=0")
+ .withRecordCount(5)
+ .withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_8_10.location())
+ .withContentOffset(4)
+ .withContentSizeInBytes(6)
+ .build();
+
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(DATA_FILE_BUCKET_0_IDS_0_2)
+ .addRows(DATA_FILE_BUCKET_0_IDS_8_10)
+ .addDeletes(dv1)
+ .addDeletes(dv2),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+ // deleting by row filter should also remove the orphaned dv1 from delete
manifests
+ commit(table,
table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 5)), branch);
+
+ Snapshot deleteSnap = latestSnapshot(table, branch);
+ assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+ assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+ validateDeleteManifest(
+ deleteSnap.deleteManifests(table.io()).get(0),
+ dataSeqs(1L, 1L),
+ fileSeqs(1L, 1L),
+ ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+ files(dv1, dv2),
+ statuses(ManifestEntry.Status.DELETED, Status.EXISTING));
+ }
+
+ @TestTemplate
+ public void removingDataFileByPathAlsoRemovesDV() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(FILE_A)
+ .addRows(FILE_B)
+ .addDeletes(fileADeletes())
+ .addDeletes(fileBDeletes()),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+ // deleting by path should also remove the orphaned DV for fileA from
delete manifests
+ commit(table, table.newDelete().deleteFile(FILE_A.location()), branch);
+
+ Snapshot deleteSnap = latestSnapshot(table, branch);
+ assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+ assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+ validateDeleteManifest(
+ deleteSnap.deleteManifests(table.io()).get(0),
+ dataSeqs(1L, 1L),
+ fileSeqs(1L, 1L),
+ ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+ files(fileADeletes(), fileBDeletes()),
+ statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+ }
+
private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0,
value);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
index 24a9593d6e..29daeb995c 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
@@ -24,8 +24,9 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.junit.jupiter.api.TestTemplate;
@@ -106,11 +107,9 @@ public class TestReplacePartitions extends TestBase {
@Parameters(name = "formatVersion = {0}, branch = {1}")
protected static List<Object> parameters() {
- return Arrays.asList(
- new Object[] {1, "main"},
- new Object[] {1, "testBranch"},
- new Object[] {2, "main"},
- new Object[] {2, "testBranch"});
+ return TestHelpers.ALL_VERSIONS.stream()
+ .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v,
"branch"}))
+ .collect(Collectors.toList());
}
@TestTemplate
@@ -826,4 +825,45 @@ public class TestReplacePartitions extends TestBase {
public void testEmptyPartitionPathWithUnpartitionedTable() {
DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath("");
}
+
+ @TestTemplate
+ public void replacingAndMergingOnePartitionAlsoRemovesDV() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ // ensure the overwrite results in a merge
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"1").commit();
+
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(FILE_A)
+ .addRows(FILE_B)
+ .addDeletes(fileADeletes())
+ .addDeletes(fileBDeletes()),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+
+ // FILE_E has the same partition as FILE_A. The dv for FILE_A will be
removed from the delete
+ // manifest
+ commit(table, table.newReplacePartitions().addFile(FILE_E), branch);
+
+ Snapshot replaceSnapshot = latestSnapshot(table, branch);
+ assertThat(replaceSnapshot.dataManifests(table.io())).hasSize(1);
+ assertThat(replaceSnapshot.deleteManifests(table.io())).hasSize(1);
+
+ validateManifestEntries(
+ replaceSnapshot.dataManifests(table.io()).get(0),
+ ids(replaceSnapshot.snapshotId(), replaceSnapshot.snapshotId(),
snapshot.snapshotId()),
+ files(FILE_E, FILE_A, FILE_B),
+ statuses(Status.ADDED, Status.DELETED, Status.EXISTING));
+
+ validateDeleteManifest(
+ replaceSnapshot.deleteManifests(table.io()).get(0),
+ dataSeqs(1L, 1L),
+ fileSeqs(1L, 1L),
+ ids(replaceSnapshot.snapshotId(), snapshot.snapshotId()),
+ files(fileADeletes(), fileBDeletes()),
+ statuses(Status.DELETED, Status.EXISTING));
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
index 96427576b4..701fdc97f2 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
@@ -379,12 +379,12 @@ public class TestRewriteFiles extends TestBase {
pending.allManifests(table.io()).get(2),
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
- ids(baseSnapshotId, baseSnapshotId),
+ ids(formatVersion >= 3 ? pendingId : baseSnapshotId, baseSnapshotId),
files(fileADeletes(), fileBDeletes()),
- statuses(ADDED, ADDED));
+ statuses(formatVersion >= 3 ? DELETED : ADDED, formatVersion >= 3 ?
EXISTING : ADDED));
- // We should only get the 4 manifests that this test is expected to add.
- assertThat(listManifestFiles()).hasSize(4);
+ // We should only get the 4 (5 for v3) manifests that this test is
expected to add.
+ assertThat(listManifestFiles()).hasSize(formatVersion >= 3 ? 5 : 4);
}
@TestTemplate
@@ -777,4 +777,40 @@ public class TestRewriteFiles extends TestBase {
.rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)),
branch);
}
+
+ @TestTemplate
+ public void removingDataFileAlsoRemovesDV() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(FILE_A)
+ .addRows(FILE_B)
+ .addDeletes(fileADeletes())
+ .addDeletes(fileBDeletes()),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ assertThat(snapshot.sequenceNumber()).isEqualTo(1);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
+
+ commit(
+ table,
+
table.newRewrite().validateFromSnapshot(snapshot.snapshotId()).deleteFile(FILE_A),
+ branch);
+
+ Snapshot deleteSnap = latestSnapshot(table, branch);
+ assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
+ assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);
+
+ assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+ validateDeleteManifest(
+ deleteSnap.deleteManifests(table.io()).get(0),
+ dataSeqs(1L, 1L),
+ fileSeqs(1L, 1L),
+ ids(deleteSnap.snapshotId(), snapshot.snapshotId()),
+ files(fileADeletes(), fileBDeletes()),
+ statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 744281e8d8..4d1d11081d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -32,6 +32,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -727,7 +728,7 @@ public class TestRowDelta extends TestBase {
assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1);
assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);
- // deleting a specific data file will not affect a delete file
+ // deleting a specific data file will not affect a delete file in v2 or
less
commit(table, table.newDelete().deleteFile(FILE_A), branch);
Snapshot deleteSnap = latestSnapshot(table, branch);
@@ -743,21 +744,26 @@ public class TestRowDelta extends TestBase {
files(FILE_A),
statuses(Status.DELETED));
- assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
+ Iterator<Long> ids =
+ formatVersion >= 3
+ ? ids(deleteSnap.snapshotId(), deltaSnapshotId)
+ : ids(deltaSnapshotId, deltaSnapshotId);
+ Iterator<Status> statuses =
+ formatVersion >= 3
+ ? statuses(Status.DELETED, Status.EXISTING)
+ : statuses(Status.ADDED, Status.ADDED);
validateDeleteManifest(
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
- ids(deltaSnapshotId, deltaSnapshotId),
+ ids,
files(fileADeletes(), fileBDeletes()),
- statuses(Status.ADDED, Status.ADDED));
+ statuses);
// the manifest that removed FILE_A will be dropped next commit, causing
the min sequence number
- // of all data files
- // to be 2, the largest known sequence number. this will cause
FILE_A_DELETES to be removed
- // because it is too old
- // to apply to any data files.
- commit(table, table.newRowDelta().removeDeletes(FILE_B_DELETES), branch);
+ // of all data files to be 2, the largest known sequence number. This will
cause FILE_A_DELETES
+ // to be removed because it is too old to apply to any data files.
+ commit(table, table.newRowDelta().removeDeletes(fileBDeletes()), branch);
Snapshot nextSnap = latestSnapshot(table, branch);
assertThat(nextSnap.sequenceNumber()).isEqualTo(3);
@@ -770,7 +776,7 @@ public class TestRowDelta extends TestBase {
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(nextSnap.snapshotId(), nextSnap.snapshotId()),
- files(fileADeletes(), fileBDeletes()),
+ formatVersion >= 3 ? files(fileBDeletes()) : files(fileADeletes(),
fileBDeletes()),
statuses(Status.DELETED, Status.DELETED));
}
@@ -803,9 +809,9 @@ public class TestRowDelta extends TestBase {
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L),
fileSeqs(1L),
- ids(deltaSnapshotId),
+ ids(formatVersion >= 3 ? deleteSnap.snapshotId() : deltaSnapshotId),
files(fileADeletes()),
- statuses(Status.ADDED));
+ statuses(formatVersion >= 3 ? Status.DELETED : Status.ADDED));
// the manifest that removed FILE_A will be dropped next merging commit,
but FastAppend will not
// remove it
@@ -839,9 +845,9 @@ public class TestRowDelta extends TestBase {
nextSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L),
fileSeqs(1L),
- ids(deltaSnapshotId),
+ ids(formatVersion >= 3 ? deleteSnap.snapshotId() : deltaSnapshotId),
files(fileADeletes()),
- statuses(Status.ADDED));
+ statuses(formatVersion >= 3 ? Status.DELETED : Status.ADDED));
}
@TestTemplate
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
- // After #11131 we don't remove the delete files
- assertFileNum(table, 1, 3);
+ assertFileNum(table, 1, 1);
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1,
"c")));
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
- // After #11131 we don't remove the delete files
- assertFileNum(table, 1, 3);
+ assertFileNum(table, 1, 1);
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1,
"c")));
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 618647259b..135b3bd090 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -307,8 +307,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
- // After #11131 we don't remove the delete files
- assertFileNum(table, 1, 3);
+ assertFileNum(table, 1, 1);
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1,
"c")));