yyanyy commented on a change in pull request #2294:
URL: https://github.com/apache/iceberg/pull/2294#discussion_r596509652



##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, 
listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, 
FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, 
pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, 
manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        
metadata.currentSnapshot().allManifests().containsAll(committedManifests));

Review comment:
       nit: could do 
`metadata.currentSnapshot().allManifests().equals(committedManifests)`

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, 
listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, 
FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, 
pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, 
manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        
metadata.currentSnapshot().allManifests().containsAll(committedManifests));
+
+    // As commit success all the manifests added with rewrite should be 
available.
+    Assert.assertEquals("Only 5 manifest should exist", 5, 
listManifestFiles().size());
+  }
+
+  @Test
+  public void testReplaceEqualityDeletesWithPositionDeletes() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A2)
+        .addDeletes(FILE_A2_DELETES)
+        .commit();
+
+    TableMetadata metadata = readMetadata();
+    long baseSnapshotId = metadata.currentSnapshot().snapshotId();
+
+    // Apply and commit the rewrite transaction.
+    RewriteFiles rewrite = table.newRewrite().rewriteFiles(
+        ImmutableSet.of(), ImmutableSet.of(FILE_A2_DELETES),
+        ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES)
+    );
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, 
pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(baseSnapshotId),
+        files(FILE_A2),
+        statuses(ADDED));
+
+    validateDeleteManifest(manifest2,
+        seqs(2),
+        ids(pending.snapshotId()),
+        files(FILE_B_DELETES),
+        statuses(ADDED));
+
+    validateDeleteManifest(manifest3,
+        seqs(2),
+        ids(pending.snapshotId()),
+        files(FILE_A2_DELETES),
+        statuses(DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest3.path()).exists());
+
+    metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, 
manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        
metadata.currentSnapshot().allManifests().containsAll(committedManifests));

Review comment:
       nit: also `equals` here

##########
File path: api/src/main/java/org/apache/iceberg/RewriteFiles.java
##########
@@ -35,11 +36,30 @@
  */
 public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
   /**
-   * Add a rewrite that replaces one set of files with another set that 
contains the same data.
+   * Add a rewrite that replaces one set of data files with another set that 
contains the same data.
    *
    * @param filesToDelete files that will be replaced (deleted), cannot be 
null or empty.
-   * @param filesToAdd files that will be added, cannot be null or empty.
+   * @param filesToAdd    files that will be added, cannot be null or empty.
    * @return this for method chaining
    */
-  RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> 
filesToAdd);
+  default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> 
filesToAdd) {
+    return rewriteFiles(
+        filesToDelete,
+        ImmutableSet.of(),
+        filesToAdd,
+        ImmutableSet.of()
+    );
+  }
+
+  /**
+   * Add a rewrite that replaces one set of files with another set that 
contains the same data (format v2).

Review comment:
       nit: probably don't need to mention format v2 specifically

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -164,6 +207,66 @@ public void testAddAndDelete() {
     Assert.assertEquals("Only 3 manifests should exist", 3, 
listManifestFiles().size());
   }
 
+  @Test
+  public void testRewriteDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+    Assert.assertEquals("Table should start empty", 0, 
listManifestFiles().size());
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    TableMetadata base = readMetadata();
+    Snapshot baseSnap = base.currentSnapshot();
+    long baseSnapshotId = baseSnap.snapshotId();
+    Assert.assertEquals("Should create 2 manifests for initial write", 2, 
baseSnap.allManifests().size());
+    List<ManifestFile> initialManifests = baseSnap.allManifests();
+
+    validateManifestEntries(initialManifests.get(0),
+        ids(baseSnapshotId, baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(ADDED, ADDED, ADDED));
+    validateDeleteManifest(initialManifests.get(1),
+        seqs(1, 1),
+        ids(baseSnapshotId, baseSnapshotId),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(ADDED, ADDED));
+
+    // Rewrite the files.
+    Snapshot pending = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of())
+        .apply();
+
+    Assert.assertEquals("Should contain 3 manifest", 3, 
pending.allManifests().size());
+    Assert.assertFalse("Should not contain manifest from initial write",
+        pending.allManifests().containsAll(initialManifests));

Review comment:
       I think we want to check if any manifest matches initial manifests by 
`.allManifests.stream().anyMatch(initialManifests::contains)` ?

##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -40,19 +40,55 @@ protected String operation() {
     return DataOperations.REPLACE;
   }
 
+  private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, 
Set<DeleteFile> deleteFilesToDelete,
+                                         Set<DataFile> dataFilesToAdd, 
Set<DeleteFile> deleteFilesToAdd) {
+    int filesToDelete = 0;
+    if (dataFilesToDelete != null) {
+      filesToDelete += dataFilesToDelete.size();
+    }
+
+    if (deleteFilesToDelete != null) {
+      filesToDelete += deleteFilesToDelete.size();
+    }
+
+    Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be 
null or empty");
+
+    if (deleteFilesToDelete == null || deleteFilesToDelete.isEmpty()) {
+      // When there is no delete files in the rewrite action, data files to 
add cannot be null or empty.
+      Preconditions.checkArgument(dataFilesToAdd != null && 
dataFilesToAdd.size() > 0,
+          "Data files to add can not be null or empty because there's no 
delete file to rewrite");
+      Preconditions.checkArgument(deleteFilesToAdd == null || 
deleteFilesToAdd.isEmpty(),
+          "Delete files to add must be null or empty because there's no delete 
file to rewrite");
+    }
+  }
+
   @Override
-  public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> 
filesToAdd) {
-    Preconditions.checkArgument(filesToDelete != null && 
!filesToDelete.isEmpty(),
-        "Files to delete cannot be null or empty");
-    Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),

Review comment:
       Looks like we are changing the logic now to not enforce input sets to be 
non-nullable? I think for the new code we can do a precondition check on the 
four input sets to ensure they are all non-null, to save all the null check 
everywhere

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, 
listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, 
FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, 
pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new 
File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, 
manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        
metadata.currentSnapshot().allManifests().containsAll(committedManifests));
+
+    // As commit success all the manifests added with rewrite should be 
available.
+    Assert.assertEquals("Only 5 manifest should exist", 5, 
listManifestFiles().size());
+  }
+
+  @Test
+  public void testReplaceEqualityDeletesWithPositionDeletes() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg 
format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A2)
+        .addDeletes(FILE_A2_DELETES)
+        .commit();
+
+    TableMetadata metadata = readMetadata();
+    long baseSnapshotId = metadata.currentSnapshot().snapshotId();
+
+    // Apply and commit the rewrite transaction.
+    RewriteFiles rewrite = table.newRewrite().rewriteFiles(
+        ImmutableSet.of(), ImmutableSet.of(FILE_A2_DELETES),
+        ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES)
+    );
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, 
pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(baseSnapshotId),

Review comment:
       nit: should we verify that sequence number for this is 1? 

##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -40,19 +40,55 @@ protected String operation() {
     return DataOperations.REPLACE;
   }
 
+  private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, 
Set<DeleteFile> deleteFilesToDelete,
+                                         Set<DataFile> dataFilesToAdd, 
Set<DeleteFile> deleteFilesToAdd) {
+    int filesToDelete = 0;
+    if (dataFilesToDelete != null) {
+      filesToDelete += dataFilesToDelete.size();
+    }
+
+    if (deleteFilesToDelete != null) {
+      filesToDelete += deleteFilesToDelete.size();
+    }
+
+    Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be 
null or empty");
+
+    if (deleteFilesToDelete == null || deleteFilesToDelete.isEmpty()) {
+      // When there is no delete files in the rewrite action, data files to 
add cannot be null or empty.
+      Preconditions.checkArgument(dataFilesToAdd != null && 
dataFilesToAdd.size() > 0,
+          "Data files to add can not be null or empty because there's no 
delete file to rewrite");

Review comment:
       Nit: might want to update to something like "Data files to add can not 
be null or empty when there is no delete file to be rewritten", similar applies 
to L61




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to