This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 23a578e5c6 Core: Prevent duplicate data/delete files (#10007)
23a578e5c6 is described below
commit 23a578e5c68dcb13d7474a6023d866f09ca512a9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Jun 20 09:44:13 2024 +0200
Core: Prevent duplicate data/delete files (#10007)
---
.../main/java/org/apache/iceberg/FastAppend.java | 12 +-
.../apache/iceberg/MergingSnapshotProducer.java | 21 +-
.../iceberg/TestBaseIncrementalAppendScan.java | 26 +-
.../java/org/apache/iceberg/TestFastAppend.java | 7 +
.../java/org/apache/iceberg/TestMergeAppend.java | 7 +
.../java/org/apache/iceberg/TestRewriteFiles.java | 6 +-
.../org/apache/iceberg/TestSnapshotSummary.java | 275 ++++++++++++++++++++-
7 files changed, 321 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 14e776a92d..1439289130 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
/**
* {@link AppendFiles Append} implementation that adds a new manifest file for
the write.
@@ -43,6 +44,7 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder =
SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
+ private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests =
Lists.newArrayList();
private List<ManifestFile> newManifests = null;
@@ -83,9 +85,13 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
@Override
public FastAppend appendFile(DataFile file) {
- this.hasNewFiles = true;
- newFiles.add(file);
- summaryBuilder.addedFile(spec, file);
+ Preconditions.checkNotNull(file, "Invalid data file: null");
+ if (newFilePaths.add(file.path())) {
+ this.hasNewFiles = true;
+ newFiles.add(file);
+ summaryBuilder.addedFile(spec, file);
+ }
+
return this;
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index c1dc6b58b7..1a4560416d 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -80,6 +80,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
+ private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
+ private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec =
Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
@@ -220,10 +222,12 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
- setDataSpec(file);
- addedFilesSummary.addedFile(dataSpec(), file);
- hasNewDataFiles = true;
- newDataFiles.add(file);
+ if (newDataFilePaths.add(file.path())) {
+ setDataSpec(file);
+ addedFilesSummary.addedFile(dataSpec(), file);
+ hasNewDataFiles = true;
+ newDataFiles.add(file);
+ }
}
/** Add a delete file to the new snapshot. */
@@ -243,9 +247,12 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
PartitionSpec fileSpec = ops.current().spec(specId);
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s ->
Lists.newArrayList());
- deleteFiles.add(fileHolder);
- addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
- hasNewDeleteFiles = true;
+
+ if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
+ deleteFiles.add(fileHolder);
+ addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
+ hasNewDeleteFiles = true;
+ }
}
private void setDataSpec(DataFile file) {
diff --git
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index 7b011b9134..119131a95a 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -67,13 +67,13 @@ public class TestBaseIncrementalAppendScan
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
String tagSnapshotBName = "t2";
- table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
- table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
/*
- files:FILE_A files:FILE_B FILE_B files:FILE_C
FILE_C
+ files:FILE_A files:FILE_B FILE_C files:FILE_D
FILE_A2
---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
*/
IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
@@ -111,11 +111,11 @@ public class TestBaseIncrementalAppendScan
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
String tagName2 = "t2";
- table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotMainBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();
- table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();
@@ -125,7 +125,7 @@ public class TestBaseIncrementalAppendScan
/*
- files:FILE_A files:FILE_B FILE_B files:FILE_B FILE_B
+ files:FILE_A files:FILE_B FILE_C files:FILE_D FILE_A2
---- snapshotA(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
\
\
@@ -175,21 +175,21 @@ public class TestBaseIncrementalAppendScan
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();
- table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotMainBId = table.currentSnapshot().snapshotId();
String branchName = "b1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
- table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+ table.newFastAppend().appendFile(FILE_D).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();
/*
- files:FILE_A files:FILE_B FILE_B
+ files:FILE_A files:FILE_B FILE_C
---- snapshotA ------ snapshotMainB
\
\
- \files:FILE_C
+ \files:FILE_D
snapshotBranchB(branch:b1)
*/
assertThatThrownBy(
@@ -267,13 +267,13 @@ public class TestBaseIncrementalAppendScan
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
String tagSnapshotBName = "t2";
- table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
- table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
/*
- files:FILE_A files:FILE_B FILE_B files:FILE_C
FILE_C
+ files:FILE_A files:FILE_B FILE_C files:FILE_D
FILE_A2
---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
*/
IncrementalAppendScan scan =
newScan().fromSnapshotExclusive(tagSnapshotAName);
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index c3fc710ebf..9dd479ecf0 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -42,6 +42,13 @@ public class TestFastAppend extends TestBase {
return Arrays.asList(1, 2);
}
+ @TestTemplate
+ public void appendNullFile() {
+ assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid data file: null");
+ }
+
@TestTemplate
public void testEmptyTableAppend() {
assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 17d6bd5a19..4719923e72 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -53,6 +53,13 @@ public class TestMergeAppend extends TestBase {
new Object[] {2, "testBranch"});
}
+ @TestTemplate
+ public void appendNullFile() {
+ assertThatThrownBy(() -> table.newAppend().appendFile(null).commit())
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid data file: null");
+ }
+
@TestTemplate
public void testEmptyTableAppend() {
assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
index 948eda5528..124cc2f28d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
@@ -194,9 +194,9 @@ public class TestRewriteFiles extends TestBase {
validateManifestEntries(
pending.allManifests(table.io()).get(1),
- ids(pendingId, pendingId, baseSnapshotId),
- files(FILE_A, FILE_A, FILE_B),
- statuses(DELETED, DELETED, EXISTING));
+ ids(pendingId, baseSnapshotId),
+ files(FILE_A, FILE_B),
+ statuses(DELETED, EXISTING));
// We should only get the 3 manifests that this test is expected to add.
assertThat(listManifestFiles()).hasSize(3);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
index 23982c510d..053a9c3741 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.Arrays;
import java.util.List;
@@ -60,27 +61,24 @@ public class TestSnapshotSummary extends TestBase {
.deleteFile(FILE_B)
.addFile(FILE_C)
.addFile(FILE_D)
- .addFile(FILE_D)
.commit();
summary = table.currentSnapshot().summary();
assertThat(summary)
- .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "30")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "20")
.containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
- .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "30");
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20");
table.newDelete().deleteFile(FILE_C).deleteFile(FILE_D).commit();
summary = table.currentSnapshot().summary();
assertThat(summary)
.containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
- .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")
.doesNotContainKey(SnapshotSummary.ADDED_FILE_SIZE_PROP);
}
@TestTemplate
public void testFileSizeSummaryWithDeletes() {
- if (formatVersion == 1) {
- return;
- }
+ assumeThat(formatVersion).isGreaterThan(1);
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
@@ -97,4 +95,267 @@ public class TestSnapshotSummary extends TestBase {
Map<String, String> summary = table.currentSnapshot().summary();
assertThat(summary).containsKey("iceberg-version");
}
+
+ @TestTemplate
+ public void fastAppendWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+
+ table
+ .newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .appendFile(FILE_A)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(11)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void mergeAppendWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+
+ table
+ .newAppend()
+ .appendFile(FILE_A)
+ .appendFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .appendFile(FILE_A)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(11)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void overwriteWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ table
+ .newOverwrite()
+ .deleteFile(FILE_A)
+ .deleteFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .deleteFile(FILE_A)
+ .addFile(FILE_C)
+ .addFile(DataFiles.builder(SPEC).copy(FILE_C).build())
+ .addFile(FILE_C)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(14)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+ .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void deleteWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ table
+ .newDelete()
+ .deleteFile(FILE_C)
+ .deleteFile(DataFiles.builder(SPEC).copy(FILE_C).build())
+ .deleteFile(FILE_C)
+ .deleteFile(FILE_D)
+ .deleteFile(DataFiles.builder(SPEC).copy(FILE_D).build())
+ .deleteFile(FILE_D)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(11)
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+ .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "2")
+ .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "2")
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0");
+ }
+
+ @TestTemplate
+ public void replacePartitionsWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+
+ table
+ .newReplacePartitions()
+ .addFile(FILE_A)
+ .addFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .addFile(FILE_A)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(12)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void rowDeltaWithDuplicates() {
+ assertThat(listManifestFiles()).isEmpty();
+
+ table
+ .newRowDelta()
+ .addRows(FILE_A)
+ .addRows(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .addRows(FILE_A)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(11)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void rowDeltaWithDeletesAndDuplicates() {
+ assumeThat(formatVersion).isGreaterThan(1);
+ assertThat(listManifestFiles()).isEmpty();
+
+ table
+ .newRowDelta()
+ .addRows(FILE_A)
+ .addRows(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .addRows(FILE_A)
+ .addDeletes(FILE_A_DELETES)
+
.addDeletes(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_A_DELETES).build())
+ .addDeletes(FILE_A_DELETES)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(14)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "20") // size of
data + delete file
+ .containsEntry(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void rewriteWithDuplicateFiles() {
+ assertThat(listManifestFiles()).isEmpty();
+
+
table.newAppend().appendFile(FILE_A2).appendFile(FILE_A2).appendFile(FILE_A2).commit();
+
+ table
+ .newRewrite()
+ .deleteFile(FILE_A2)
+ .deleteFile(DataFiles.builder(SPEC).copy(FILE_A2).build())
+ .deleteFile(FILE_A2)
+ .addFile(FILE_A)
+ .addFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+ .addFile(FILE_A)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(14)
+ .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+ .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "1")
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
+
+ @TestTemplate
+ public void rewriteWithDeletesAndDuplicates() {
+ assumeThat(formatVersion).isGreaterThan(1);
+ assertThat(listManifestFiles()).isEmpty();
+
+ table.newRowDelta().addRows(FILE_A2).addDeletes(FILE_A_DELETES).commit();
+
+ table
+ .newRewrite()
+ .deleteFile(FILE_A_DELETES)
+
.deleteFile(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_A_DELETES).build())
+ .deleteFile(FILE_A_DELETES)
+ .addFile(FILE_B_DELETES)
+
.addFile(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_B_DELETES).build())
+ .addFile(FILE_B_DELETES)
+ .commit();
+
+ assertThat(table.currentSnapshot().summary())
+ .hasSize(16)
+ .containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "1")
+ .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+ .containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+ .containsEntry(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.REMOVED_POS_DELETES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "1")
+ .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20")
+ .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+ }
}