stevenzwu commented on code in PR #13245:
URL: https://github.com/apache/iceberg/pull/13245#discussion_r2178316080


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -377,11 +375,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, 
IsolationLevel isolation
       this.isolationLevel = isolationLevel;
     }
 
-    private List<DataFile> overwrittenFiles() {
+    private DataFileSet overwrittenFiles() {
+      if (scan == null) {
+        return DataFileSet.create();
+      } else {
+        return scan.tasks().stream()
+            .map(FileScanTask::file)
+            .collect(Collectors.toCollection(DataFileSet::create));
+      }
+    }
+
+    private DeleteFileSet rewritableDeletes() {
       if (scan == null) {
-        return ImmutableList.of();
+        return DeleteFileSet.create();
       } else {
-        return 
scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList());
+        return scan.tasks().stream()
+            .flatMap(task -> task.deletes().stream())

Review Comment:
   here, we don't filter on DVs as in `RewriteFileGroup`. is that intentional?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -402,11 +412,10 @@ private Expression conflictDetectionFilter() {
     public void commit(WriterCommitMessage[] messages) {
       OverwriteFiles overwriteFiles = table.newOverwrite();
 
-      List<DataFile> overwrittenFiles = overwrittenFiles();
+      DataFileSet overwrittenFiles = overwrittenFiles();
       int numOverwrittenFiles = overwrittenFiles.size();
-      for (DataFile overwrittenFile : overwrittenFiles) {
-        overwriteFiles.deleteFile(overwrittenFile);
-      }
+      DeleteFileSet rewritableDeletes = rewritableDeletes();

Review Comment:
   nit: similar naming `orphanedDVs`?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -172,14 +172,16 @@ public RewriteDataFiles.Result execute() {
         partialProgressEnabled
             ? doExecuteWithPartialProgress(plan, 
commitManager(startingSnapshotId))
             : doExecute(plan, commitManager(startingSnapshotId));
+    ImmutableRewriteDataFiles.Result result = resultBuilder.build();
 
     if (removeDanglingDeletes) {
       RemoveDanglingDeletesSparkAction action =
           new RemoveDanglingDeletesSparkAction(spark(), table);
       int removedCount = Iterables.size(action.execute().removedDeleteFiles());
-      resultBuilder.removedDeleteFilesCount(removedCount);
+      return 
result.withRemovedDeleteFilesCount(result.removedDeleteFilesCount() + 
removedCount);

Review Comment:
   can we rename the variables to make them more informative?
   ```
   removedCount -> removedDangling
   ```
   
   also add comment to explain `result.removedDeleteFilesCount()` captures the 
number of removed orphaned DVs



##########
core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java:
##########
@@ -74,19 +75,23 @@ public RewriteDataFilesCommitManager(
   public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
     DataFileSet rewrittenDataFiles = DataFileSet.create();
     DataFileSet addedDataFiles = DataFileSet.create();
+    DeleteFileSet rewritableDeletes = DeleteFileSet.create();

Review Comment:
   nit: similar to the other comment. call this `orphanedDeleteVectors`



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -538,6 +542,75 @@ public void testBinPackWithDeletes() throws IOException {
     assertThat(actualRecords).as("7 rows are removed").hasSize(total - 7);
   }
 
+  @TestTemplate
+  public void removeOrphanedDVsFromDeleteManifest() throws Exception {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    Table table = createTable();
+    int numDataFiles = 5;
+    // 100 / 5 = 20 records per data file
+    writeRecords(numDataFiles, 100);
+    int numPositionsToDelete = 10;
+    table.refresh();
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    assertThat(dataFiles).hasSize(numDataFiles);
+
+    RowDelta rowDelta = table.newRowDelta();
+    for (DataFile dataFile : dataFiles) {
+      writeDV(table, dataFile.partition(), dataFile.location(), 
numPositionsToDelete)
+          .forEach(rowDelta::addDeletes);
+    }
+
+    rowDelta.commit();
+
+    Set<DeleteFile> deleteFiles = TestHelpers.deleteFiles(table);
+    assertThat(deleteFiles).hasSize(numDataFiles);
+
+    for (ManifestFile manifestFile : 
table.currentSnapshot().deleteManifests(table.io())) {
+      Set<String> validDataFilePaths =
+          TestHelpers.dataFiles(table).stream()
+              .map(ContentFile::location)
+              .collect(Collectors.toSet());
+      ManifestReader<DeleteFile> reader =
+          ManifestFiles.readDeleteManifest(
+              manifestFile, table.io(), ((BaseTable) 
table).operations().current().specsById());
+      for (DeleteFile deleteFile : reader) {
+        // make sure there are no orphaned DVs
+        
assertThat(validDataFilePaths).contains(deleteFile.referencedDataFile());
+      }
+    }
+
+    // all data files should be rewritten. Set MIN_INPUT_FILES > to the number 
of data files so that
+    // compaction is only triggered when the delete ratio of >= 30% is hit
+    RewriteDataFiles.Result result =
+        SparkActions.get(spark)
+            .rewriteDataFiles(table)
+            .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "10")
+            .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0")
+            .execute();
+
+    assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);
+    assertThat(result.removedDeleteFilesCount()).isEqualTo(numDataFiles);
+
+    table.refresh();
+    assertThat(TestHelpers.dataFiles(table)).hasSize(1);
+    assertThat(TestHelpers.deleteFiles(table)).isEmpty();
+
+    Set<String> validDataFilePaths =

Review Comment:
   is line 598-611 needed? line 596 above already asserted deleteFiles is empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to