This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 23a578e5c6 Core: Prevent duplicate data/delete files (#10007)
23a578e5c6 is described below

commit 23a578e5c68dcb13d7474a6023d866f09ca512a9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Jun 20 09:44:13 2024 +0200

    Core: Prevent duplicate data/delete files (#10007)
---
 .../main/java/org/apache/iceberg/FastAppend.java   |  12 +-
 .../apache/iceberg/MergingSnapshotProducer.java    |  21 +-
 .../iceberg/TestBaseIncrementalAppendScan.java     |  26 +-
 .../java/org/apache/iceberg/TestFastAppend.java    |   7 +
 .../java/org/apache/iceberg/TestMergeAppend.java   |   7 +
 .../java/org/apache/iceberg/TestRewriteFiles.java  |   6 +-
 .../org/apache/iceberg/TestSnapshotSummary.java    | 275 ++++++++++++++++++++-
 7 files changed, 321 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 14e776a92d..1439289130 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
 
 /**
  * {@link AppendFiles Append} implementation that adds a new manifest file for 
the write.
@@ -43,6 +44,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
   private final PartitionSpec spec;
   private final SnapshotSummary.Builder summaryBuilder = 
SnapshotSummary.builder();
   private final List<DataFile> newFiles = Lists.newArrayList();
+  private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = 
Lists.newArrayList();
   private List<ManifestFile> newManifests = null;
@@ -83,9 +85,13 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
 
   @Override
   public FastAppend appendFile(DataFile file) {
-    this.hasNewFiles = true;
-    newFiles.add(file);
-    summaryBuilder.addedFile(spec, file);
+    Preconditions.checkNotNull(file, "Invalid data file: null");
+    if (newFilePaths.add(file.path())) {
+      this.hasNewFiles = true;
+      newFiles.add(file);
+      summaryBuilder.addedFile(spec, file);
+    }
+
     return this;
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index c1dc6b58b7..1a4560416d 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -80,6 +80,8 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
   // update data
   private final List<DataFile> newDataFiles = Lists.newArrayList();
+  private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
+  private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
   private Long newDataFilesDataSequenceNumber;
   private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = 
Maps.newHashMap();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
@@ -220,10 +222,12 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   /** Add a data file to the new snapshot. */
   protected void add(DataFile file) {
     Preconditions.checkNotNull(file, "Invalid data file: null");
-    setDataSpec(file);
-    addedFilesSummary.addedFile(dataSpec(), file);
-    hasNewDataFiles = true;
-    newDataFiles.add(file);
+    if (newDataFilePaths.add(file.path())) {
+      setDataSpec(file);
+      addedFilesSummary.addedFile(dataSpec(), file);
+      hasNewDataFiles = true;
+      newDataFiles.add(file);
+    }
   }
 
   /** Add a delete file to the new snapshot. */
@@ -243,9 +247,12 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     PartitionSpec fileSpec = ops.current().spec(specId);
     List<DeleteFileHolder> deleteFiles =
         newDeleteFilesBySpec.computeIfAbsent(specId, s -> 
Lists.newArrayList());
-    deleteFiles.add(fileHolder);
-    addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
-    hasNewDeleteFiles = true;
+
+    if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
+      deleteFiles.add(fileHolder);
+      addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
+      hasNewDeleteFiles = true;
+    }
   }
 
   private void setDataSpec(DataFile file) {
diff --git 
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java 
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index 7b011b9134..119131a95a 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -67,13 +67,13 @@ public class TestBaseIncrementalAppendScan
     table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
 
     String tagSnapshotBName = "t2";
-    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
     long snapshotBId = table.currentSnapshot().snapshotId();
     table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
-    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+    table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
 
     /*
-              files:FILE_A         files:FILE_B FILE_B       files:FILE_C 
FILE_C
+              files:FILE_A         files:FILE_B FILE_C       files:FILE_D 
FILE_A2
      ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
     */
     IncrementalAppendScan scan = 
newScan().fromSnapshotInclusive(tagSnapshotAName);
@@ -111,11 +111,11 @@ public class TestBaseIncrementalAppendScan
     table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
 
     String tagName2 = "t2";
-    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
     long snapshotMainBId = table.currentSnapshot().snapshotId();
     table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();
 
-    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
 
     table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
     long snapshotBranchBId = table.snapshot(branchName).snapshotId();
@@ -125,7 +125,7 @@ public class TestBaseIncrementalAppendScan
 
     /*
 
-            files:FILE_A         files:FILE_B FILE_B       files:FILE_B FILE_B
+            files:FILE_A         files:FILE_B FILE_C       files:FILE_D FILE_A2
      ---- snapshotA(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
                         \
                          \
@@ -175,21 +175,21 @@ public class TestBaseIncrementalAppendScan
     table.newFastAppend().appendFile(FILE_A).commit();
     long snapshotAId = table.currentSnapshot().snapshotId();
 
-    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
     long snapshotMainBId = table.currentSnapshot().snapshotId();
 
     String branchName = "b1";
     table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
-    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    table.newFastAppend().appendFile(FILE_D).toBranch(branchName).commit();
     long snapshotBranchBId = table.snapshot(branchName).snapshotId();
 
     /*
 
-          files:FILE_A            files:FILE_B FILE_B
+          files:FILE_A            files:FILE_B FILE_C
           ---- snapshotA  ------ snapshotMainB
                         \
                          \
-                          \files:FILE_C
+                          \files:FILE_D
                           snapshotBranchB(branch:b1)
     */
     assertThatThrownBy(
@@ -267,13 +267,13 @@ public class TestBaseIncrementalAppendScan
     table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
 
     String tagSnapshotBName = "t2";
-    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
     long snapshotBId = table.currentSnapshot().snapshotId();
     table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
-    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+    table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();
 
     /*
-              files:FILE_A         files:FILE_B FILE_B       files:FILE_C 
FILE_C
+              files:FILE_A         files:FILE_B FILE_C       files:FILE_D 
FILE_A2
      ---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ----  currentSnapshot
     */
     IncrementalAppendScan scan = 
newScan().fromSnapshotExclusive(tagSnapshotAName);
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java 
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index c3fc710ebf..9dd479ecf0 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -42,6 +42,13 @@ public class TestFastAppend extends TestBase {
     return Arrays.asList(1, 2);
   }
 
+  @TestTemplate
+  public void appendNullFile() {
+    assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("Invalid data file: null");
+  }
+
   @TestTemplate
   public void testEmptyTableAppend() {
     assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 17d6bd5a19..4719923e72 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -53,6 +53,13 @@ public class TestMergeAppend extends TestBase {
         new Object[] {2, "testBranch"});
   }
 
+  @TestTemplate
+  public void appendNullFile() {
+    assertThatThrownBy(() -> table.newAppend().appendFile(null).commit())
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("Invalid data file: null");
+  }
+
   @TestTemplate
   public void testEmptyTableAppend() {
     assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
index 948eda5528..124cc2f28d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
@@ -194,9 +194,9 @@ public class TestRewriteFiles extends TestBase {
 
     validateManifestEntries(
         pending.allManifests(table.io()).get(1),
-        ids(pendingId, pendingId, baseSnapshotId),
-        files(FILE_A, FILE_A, FILE_B),
-        statuses(DELETED, DELETED, EXISTING));
+        ids(pendingId, baseSnapshotId),
+        files(FILE_A, FILE_B),
+        statuses(DELETED, EXISTING));
 
     // We should only get the 3 manifests that this test is expected to add.
     assertThat(listManifestFiles()).hasSize(3);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
index 23982c510d..053a9c3741 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.util.Arrays;
 import java.util.List;
@@ -60,27 +61,24 @@ public class TestSnapshotSummary extends TestBase {
         .deleteFile(FILE_B)
         .addFile(FILE_C)
         .addFile(FILE_D)
-        .addFile(FILE_D)
         .commit();
     summary = table.currentSnapshot().summary();
     assertThat(summary)
-        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "30")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "20")
         .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
-        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "30");
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20");
 
     table.newDelete().deleteFile(FILE_C).deleteFile(FILE_D).commit();
     summary = table.currentSnapshot().summary();
     assertThat(summary)
         .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
-        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")
         .doesNotContainKey(SnapshotSummary.ADDED_FILE_SIZE_PROP);
   }
 
   @TestTemplate
   public void testFileSizeSummaryWithDeletes() {
-    if (formatVersion == 1) {
-      return;
-    }
+    assumeThat(formatVersion).isGreaterThan(1);
 
     
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
 
@@ -97,4 +95,267 @@ public class TestSnapshotSummary extends TestBase {
     Map<String, String> summary = table.currentSnapshot().summary();
     assertThat(summary).containsKey("iceberg-version");
   }
+
+  @TestTemplate
+  public void fastAppendWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    table
+        .newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .appendFile(FILE_A)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(11)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void mergeAppendWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    table
+        .newAppend()
+        .appendFile(FILE_A)
+        .appendFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .appendFile(FILE_A)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(11)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void overwriteWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    table
+        .newOverwrite()
+        .deleteFile(FILE_A)
+        .deleteFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .deleteFile(FILE_A)
+        .addFile(FILE_C)
+        .addFile(DataFiles.builder(SPEC).copy(FILE_C).build())
+        .addFile(FILE_C)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(14)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+        .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void deleteWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+    table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    table
+        .newDelete()
+        .deleteFile(FILE_C)
+        .deleteFile(DataFiles.builder(SPEC).copy(FILE_C).build())
+        .deleteFile(FILE_C)
+        .deleteFile(FILE_D)
+        .deleteFile(DataFiles.builder(SPEC).copy(FILE_D).build())
+        .deleteFile(FILE_D)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(11)
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+        .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "2")
+        .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "2")
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "20")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0");
+  }
+
+  @TestTemplate
+  public void replacePartitionsWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    table
+        .newReplacePartitions()
+        .addFile(FILE_A)
+        .addFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .addFile(FILE_A)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(12)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void rowDeltaWithDuplicates() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    table
+        .newRowDelta()
+        .addRows(FILE_A)
+        .addRows(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .addRows(FILE_A)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(11)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void rowDeltaWithDeletesAndDuplicates() {
+    assumeThat(formatVersion).isGreaterThan(1);
+    assertThat(listManifestFiles()).isEmpty();
+
+    table
+        .newRowDelta()
+        .addRows(FILE_A)
+        .addRows(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .addRows(FILE_A)
+        .addDeletes(FILE_A_DELETES)
+        
.addDeletes(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_A_DELETES).build())
+        .addDeletes(FILE_A_DELETES)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(14)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "20") // size of 
data + delete file
+        .containsEntry(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void rewriteWithDuplicateFiles() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    
table.newAppend().appendFile(FILE_A2).appendFile(FILE_A2).appendFile(FILE_A2).commit();
+
+    table
+        .newRewrite()
+        .deleteFile(FILE_A2)
+        .deleteFile(DataFiles.builder(SPEC).copy(FILE_A2).build())
+        .deleteFile(FILE_A2)
+        .addFile(FILE_A)
+        .addFile(DataFiles.builder(SPEC).copy(FILE_A).build())
+        .addFile(FILE_A)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(14)
+        .containsEntry(SnapshotSummary.ADDED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADDED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1")
+        .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.DELETED_RECORDS_PROP, "1")
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
+
+  @TestTemplate
+  public void rewriteWithDeletesAndDuplicates() {
+    assumeThat(formatVersion).isGreaterThan(1);
+    assertThat(listManifestFiles()).isEmpty();
+
+    table.newRowDelta().addRows(FILE_A2).addDeletes(FILE_A_DELETES).commit();
+
+    table
+        .newRewrite()
+        .deleteFile(FILE_A_DELETES)
+        
.deleteFile(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_A_DELETES).build())
+        .deleteFile(FILE_A_DELETES)
+        .addFile(FILE_B_DELETES)
+        
.addFile(FileMetadata.deleteFileBuilder(SPEC).copy(FILE_B_DELETES).build())
+        .addFile(FILE_B_DELETES)
+        .commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(16)
+        .containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "1")
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2")
+        .containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "10")
+        .containsEntry(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.REMOVED_POS_DELETES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "1")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
+  }
 }

Reply via email to