stevenzwu commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1167033674


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,21 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression 
expr) {
     return this;
   }
 
+  /**
+   * If the compaction should use the sequence number of the snapshot at 
compaction start time for
+   * new data files, instead of using the sequence number of the newly 
produced snapshot.
+   *
+   * <p>This avoids commit conflicts with updates that add newer equality 
deletes at a higher
+   * sequence number.
+   *
+   * @param useStarting use starting sequence number if set to true
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean 
useStarting) {

Review Comment:
   it seems that `BaseRewriteDataFilesAction` is only extended by Flink 
actions. I am fine with adding this API in the core module as a quick fix for 
now. agree that long-term plan should be on the new rewrite interfaces. But for 
backward compatibility, should the default be false? and Flink rewrite action 
can pass in `true`.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws 
IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =

Review Comment:
   aren't stale1 and stale2 referencing the same snapshot in this case? do you 
mean't to do the load table after each insert statement above?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws 
IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, 
TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, 
TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 
'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = 
icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, 
dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, 
deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+    // Rewrite using the starting sequence number should succeed

Review Comment:
   nit: add an new empty line to easily see there are two separate tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws 
IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, 
TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, 
TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 
'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = 
icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, 
dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, 
deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);

Review Comment:
   should we also add assertions on the file sequence numbers here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to