sririshindra commented on code in PR #4817:
URL: https://github.com/apache/iceberg/pull/4817#discussion_r881771794


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -85,8 +85,33 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   interface Result {
     /**
+     * @deprecated since 0.14.0, will be removed in 0.15.0; use {@link 
#orphanFiles()} instead.
      * Returns locations of orphan files.
      */
+    @Deprecated

Review Comment:
   Instead of deprecating this method, can we add a default implementation that 
returns only `OrphanFileLocations`. This default implementation would call the 
`orphanFiles` method to get the OrphaFileStatus and use your implementation of 
the orphanFileLocations in BaseDeleteOrphanFilesActionResult. 



##########
core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java:
##########
@@ -19,16 +19,26 @@
 
 package org.apache.iceberg.actions;
 
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
 public class BaseDeleteOrphanFilesActionResult implements 
DeleteOrphanFiles.Result {
 
-  private final Iterable<String> orphanFileLocations;
+  private final Iterable<DeleteOrphanFiles.OrphanFileStatus> 
orphanFileLocations;
 
-  public BaseDeleteOrphanFilesActionResult(Iterable<String> 
orphanFileLocations) {
+  public 
BaseDeleteOrphanFilesActionResult(Iterable<DeleteOrphanFiles.OrphanFileStatus> 
orphanFileLocations) {
     this.orphanFileLocations = orphanFileLocations;
   }
 
   @Override
   public Iterable<String> orphanFileLocations() {
+    return StreamSupport.stream(this.orphanFileLocations.spliterator(), false)

Review Comment:
   Can we make this `return 
StreamSupport.stream(this.orphanFileLocations.spliterator(), true)` so that we 
get a parallel stream. The order of the fileLocations are available shouldn't 
matter. correct? In that case, we should probably use a parallel stream.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -315,6 +319,77 @@ public void orphanedFileRemovedWithParallelTasks() throws 
InterruptedException,
             Sets.newHashSet("remove-orphan-0", "remove-orphan-1", 
"remove-orphan-2", "remove-orphan-3"));
 
     Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+    assertDeletedStatusAndFailureCause(result.orphanFiles());
+  }
+
+  @Test
+  public void testOrphanFileDeleteThrowsException() {
+    Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+    );
+    Dataset<Row> df1 = spark.createDataFrame(records1, 
ThreeColumnRecord.class).coalesce(1);
+
+    // original append
+    df1.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")
+    );
+    Dataset<Row> df2 = spark.createDataFrame(records2, 
ThreeColumnRecord.class).coalesce(1);
+
+    // dynamic partition overwrite
+    df2.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("overwrite")
+        .save(tableLocation);
+
+    // second append
+    df2.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA/c3=AAAA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/invalid/invalid");
+
+    waitUntilAfter(System.currentTimeMillis());
+
+    String locationSubstringForException = "invalid";
+    DeleteOrphanFiles.Result result = 
SparkActions.get().deleteOrphanFiles(table)
+        .olderThan(System.currentTimeMillis())
+        .deleteWith(file -> {
+          if (file.contains(locationSubstringForException)) {
+            throw new RuntimeException("simulating failure during file 
deletion");
+          }
+          table.io().deleteFile(file);
+        })
+        .execute();
+
+    Assert.assertEquals("Should delete 4 files", 4, 
Iterables.size(result.orphanFiles()));
+
+    DeleteOrphanFiles.OrphanFileStatus fileStatus = 
StreamSupport.stream(result.orphanFiles().spliterator(), false)
+        .filter(status -> 
status.location().contains(locationSubstringForException))
+        .findFirst()
+        .get();
+    Assert.assertFalse("Deleted status should be false", fileStatus.deleted());
+    Assert.assertEquals("Failure cause should be present", "simulating failure 
during file deletion",
+        fileStatus.failureCause().getMessage());
+
+    List<DeleteOrphanFiles.OrphanFileStatus> deletedFileStatuses = 
StreamSupport.stream(
+            result.orphanFiles().spliterator(), false)
+        .filter(status -> 
!status.location().contains(locationSubstringForException))

Review Comment:
   Maybe we can call the `deleteOrphanFiles` action again after this step but 
without throwing the simulated exception and assert that the "invalid" files 
are getting deleted as well. I am not sure if it adds any value, but it is up 
to you. 
   My thinking for this suggestion is that the deletion of the orphan files 
should work once whatever underlying issues like maybe permissions or network 
etc.. are resolved. The `deleteOrphanFiles` action should delete files that 
failed to get deleted previously. 



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
     }
     return current;
   }
+
+  protected List<String> 
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+    return StreamSupport.stream(fileStatuses.spliterator(), false)
+        .map(DeleteOrphanFiles.OrphanFileStatus::location)
+        .collect(Collectors.toList());
+  }
+
+  protected List<Exception> 
toNonNullFailureCauses(Iterable<DeleteOrphanFiles.OrphanFileStatus> 
fileStatuses) {
+    return StreamSupport.stream(fileStatuses.spliterator(), false)

Review Comment:
   ditto



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
     }
     return current;
   }
+
+  protected List<String> 
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+    return StreamSupport.stream(fileStatuses.spliterator(), false)
+        .map(DeleteOrphanFiles.OrphanFileStatus::location)
+        .collect(Collectors.toList());
+  }
+
+  protected List<Exception> 
toNonNullFailureCauses(Iterable<DeleteOrphanFiles.OrphanFileStatus> 
fileStatuses) {
+    return StreamSupport.stream(fileStatuses.spliterator(), false)
+        .map(DeleteOrphanFiles.OrphanFileStatus::failureCause)
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  protected void 
assertDeletedStatusAndFailureCause(Iterable<DeleteOrphanFiles.OrphanFileStatus> 
fileStatuses) {
+    Assert.assertTrue("Deleted status of orphan file should be true",
+        StreamSupport.stream(fileStatuses.spliterator(), 
false).allMatch(DeleteOrphanFiles.OrphanFileStatus::deleted));

Review Comment:
   Ditto



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1459,12 +1459,12 @@ public void testRemoveOrphanFilesActionSupport() throws 
InterruptedException {
         .location(table.location() + "/metadata")
         .olderThan(System.currentTimeMillis())
         .execute();
-    Assert.assertTrue("Should not delete any metadata files", 
Iterables.isEmpty(result1.orphanFileLocations()));
+    Assert.assertTrue("Should not delete any metadata files", 
Iterables.isEmpty(result1.orphanFiles()));
 
     DeleteOrphanFiles.Result result2 = actions.deleteOrphanFiles(table)
         .olderThan(System.currentTimeMillis())
         .execute();
-    Assert.assertEquals("Should delete 1 data file", 1, 
Iterables.size(result2.orphanFileLocations()));

Review Comment:
   ditto



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -85,8 +85,33 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   interface Result {
     /**
+     * @deprecated since 0.14.0, will be removed in 0.15.0; use {@link 
#orphanFiles()} instead.
      * Returns locations of orphan files.
      */
+    @Deprecated
     Iterable<String> orphanFileLocations();
+
+    /**
+     * Returns orphan files.
+     */
+    Iterable<OrphanFileStatus> orphanFiles();

Review Comment:
   Maybe this method should be renamed to orphanFIleStuatuses as well for more 
accuracy.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1459,12 +1459,12 @@ public void testRemoveOrphanFilesActionSupport() throws 
InterruptedException {
         .location(table.location() + "/metadata")
         .olderThan(System.currentTimeMillis())
         .execute();
-    Assert.assertTrue("Should not delete any metadata files", 
Iterables.isEmpty(result1.orphanFileLocations()));

Review Comment:
   If we decide to keep the `orphanFileLocations` method without deprecating 
it, then we should keep this as is.



##########
core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java:
##########
@@ -19,16 +19,26 @@
 
 package org.apache.iceberg.actions;
 
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
 public class BaseDeleteOrphanFilesActionResult implements 
DeleteOrphanFiles.Result {
 
-  private final Iterable<String> orphanFileLocations;
+  private final Iterable<DeleteOrphanFiles.OrphanFileStatus> 
orphanFileLocations;

Review Comment:
   I think the name `orphanFileLocations ` should be renamed to something like 
`orphanFileStatus` or `orphanFileStatuses` . This variable no longer represents 
orphanFileLocations.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
     }
     return current;
   }
+
+  protected List<String> 
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+    return StreamSupport.stream(fileStatuses.spliterator(), false)

Review Comment:
   Can we make this `StreamSupport.stream(fileStatuses.spliterator(), true)` to 
use a parallel stream



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java:
##########
@@ -309,10 +309,10 @@ public void testIgnoreMetadataFilesNotFound() {
     DeleteOrphanFiles.Result result =
         
sparkActions().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.orphanFileLocations()));
+    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.orphanFiles()));
     Assert.assertTrue("Should remove v1 file",
-        StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
-            .anyMatch(file -> file.contains("v1.metadata.json")));
+        StreamSupport.stream(result.orphanFiles().spliterator(), false)

Review Comment:
   If we decide to keep the `orphanFileLocations` method without deprecating 
it, then we should keep this as is.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -222,14 +222,22 @@ private DeleteOrphanFiles.Result doExecute() {
         .as(Encoders.STRING())
         .collectAsList();
 
+    List<DeleteOrphanFiles.OrphanFileStatus> orphanFileStatuses = 
Lists.newArrayListWithCapacity(orphanFiles.size());
+
     Tasks.foreach(orphanFiles)
         .noRetry()
         .executeWith(deleteExecutorService)
         .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
-
-    return new BaseDeleteOrphanFilesActionResult(orphanFiles);
+        .onFailure((file, exc) -> {
+          LOG.warn("Failed to delete file: {}", file, exc);
+          orphanFileStatuses.add(new BaseOrphanFileStatus(file, /* deleted */ 
false, exc));

Review Comment:
   nit: maybe remove the comment. its upto you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to