sririshindra commented on code in PR #4817:
URL: https://github.com/apache/iceberg/pull/4817#discussion_r881771794
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -85,8 +85,33 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
interface Result {
/**
+ * @deprecated since 0.14.0, will be removed in 0.15.0; use {@link
#orphanFiles()} instead.
* Returns locations of orphan files.
*/
+ @Deprecated
Review Comment:
Instead of deprecating this method, can we add a default implementation that
returns only `OrphanFileLocations`. This default implementation would call the
`orphanFiles` method to get the OrphaFileStatus and use your implementation of
the orphanFileLocations in BaseDeleteOrphanFilesActionResult.
##########
core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java:
##########
@@ -19,16 +19,26 @@
package org.apache.iceberg.actions;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
public class BaseDeleteOrphanFilesActionResult implements
DeleteOrphanFiles.Result {
- private final Iterable<String> orphanFileLocations;
+ private final Iterable<DeleteOrphanFiles.OrphanFileStatus>
orphanFileLocations;
- public BaseDeleteOrphanFilesActionResult(Iterable<String>
orphanFileLocations) {
+ public
BaseDeleteOrphanFilesActionResult(Iterable<DeleteOrphanFiles.OrphanFileStatus>
orphanFileLocations) {
this.orphanFileLocations = orphanFileLocations;
}
@Override
public Iterable<String> orphanFileLocations() {
+ return StreamSupport.stream(this.orphanFileLocations.spliterator(), false)
Review Comment:
Can we make this `return
StreamSupport.stream(this.orphanFileLocations.spliterator(), true)` so that we
get a parallel stream. The order of the fileLocations are available shouldn't
matter. correct? In that case, we should probably use a parallel stream.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -315,6 +319,77 @@ public void orphanedFileRemovedWithParallelTasks() throws
InterruptedException,
Sets.newHashSet("remove-orphan-0", "remove-orphan-1",
"remove-orphan-2", "remove-orphan-3"));
Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+ assertDeletedStatusAndFailureCause(result.orphanFiles());
+ }
+
+ @Test
+ public void testOrphanFileDeleteThrowsException() {
+ Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+ );
+ Dataset<Row> df1 = spark.createDataFrame(records1,
ThreeColumnRecord.class).coalesce(1);
+
+ // original append
+ df1.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")
+ );
+ Dataset<Row> df2 = spark.createDataFrame(records2,
ThreeColumnRecord.class).coalesce(1);
+
+ // dynamic partition overwrite
+ df2.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("overwrite")
+ .save(tableLocation);
+
+ // second append
+ df2.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+
+ df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
+ df2.coalesce(1).write().mode("append").parquet(tableLocation +
"/data/c2_trunc=AA");
+ df2.coalesce(1).write().mode("append").parquet(tableLocation +
"/data/c2_trunc=AA/c3=AAAA");
+ df2.coalesce(1).write().mode("append").parquet(tableLocation +
"/data/invalid/invalid");
+
+ waitUntilAfter(System.currentTimeMillis());
+
+ String locationSubstringForException = "invalid";
+ DeleteOrphanFiles.Result result =
SparkActions.get().deleteOrphanFiles(table)
+ .olderThan(System.currentTimeMillis())
+ .deleteWith(file -> {
+ if (file.contains(locationSubstringForException)) {
+ throw new RuntimeException("simulating failure during file
deletion");
+ }
+ table.io().deleteFile(file);
+ })
+ .execute();
+
+ Assert.assertEquals("Should delete 4 files", 4,
Iterables.size(result.orphanFiles()));
+
+ DeleteOrphanFiles.OrphanFileStatus fileStatus =
StreamSupport.stream(result.orphanFiles().spliterator(), false)
+ .filter(status ->
status.location().contains(locationSubstringForException))
+ .findFirst()
+ .get();
+ Assert.assertFalse("Deleted status should be false", fileStatus.deleted());
+ Assert.assertEquals("Failure cause should be present", "simulating failure
during file deletion",
+ fileStatus.failureCause().getMessage());
+
+ List<DeleteOrphanFiles.OrphanFileStatus> deletedFileStatuses =
StreamSupport.stream(
+ result.orphanFiles().spliterator(), false)
+ .filter(status ->
!status.location().contains(locationSubstringForException))
Review Comment:
Maybe we can call the `deleteOrphanFiles` action again after this step but
without throwing the simulated exception and assert that the "invalid" files
are getting deleted as well. I am not sure if it adds any value, but it is up
to you.
My thinking for this suggestion is that the deletion of the orphan files
should work once whatever underlying issues like maybe permissions or network
etc.. are resolved. The `deleteOrphanFiles` action should delete files that
failed to get deleted previously.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
}
return current;
}
+
+ protected List<String>
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+ return StreamSupport.stream(fileStatuses.spliterator(), false)
+ .map(DeleteOrphanFiles.OrphanFileStatus::location)
+ .collect(Collectors.toList());
+ }
+
+ protected List<Exception>
toNonNullFailureCauses(Iterable<DeleteOrphanFiles.OrphanFileStatus>
fileStatuses) {
+ return StreamSupport.stream(fileStatuses.spliterator(), false)
Review Comment:
ditto
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
}
return current;
}
+
+ protected List<String>
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+ return StreamSupport.stream(fileStatuses.spliterator(), false)
+ .map(DeleteOrphanFiles.OrphanFileStatus::location)
+ .collect(Collectors.toList());
+ }
+
+ protected List<Exception>
toNonNullFailureCauses(Iterable<DeleteOrphanFiles.OrphanFileStatus>
fileStatuses) {
+ return StreamSupport.stream(fileStatuses.spliterator(), false)
+ .map(DeleteOrphanFiles.OrphanFileStatus::failureCause)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ protected void
assertDeletedStatusAndFailureCause(Iterable<DeleteOrphanFiles.OrphanFileStatus>
fileStatuses) {
+ Assert.assertTrue("Deleted status of orphan file should be true",
+ StreamSupport.stream(fileStatuses.spliterator(),
false).allMatch(DeleteOrphanFiles.OrphanFileStatus::deleted));
Review Comment:
Ditto
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1459,12 +1459,12 @@ public void testRemoveOrphanFilesActionSupport() throws
InterruptedException {
.location(table.location() + "/metadata")
.olderThan(System.currentTimeMillis())
.execute();
- Assert.assertTrue("Should not delete any metadata files",
Iterables.isEmpty(result1.orphanFileLocations()));
+ Assert.assertTrue("Should not delete any metadata files",
Iterables.isEmpty(result1.orphanFiles()));
DeleteOrphanFiles.Result result2 = actions.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute();
- Assert.assertEquals("Should delete 1 data file", 1,
Iterables.size(result2.orphanFileLocations()));
Review Comment:
ditto
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -85,8 +85,33 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
interface Result {
/**
+ * @deprecated since 0.14.0, will be removed in 0.15.0; use {@link
#orphanFiles()} instead.
* Returns locations of orphan files.
*/
+ @Deprecated
Iterable<String> orphanFileLocations();
+
+ /**
+ * Returns orphan files.
+ */
+ Iterable<OrphanFileStatus> orphanFiles();
Review Comment:
Maybe this method should be renamed to orphanFIleStuatuses as well for more
accuracy.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1459,12 +1459,12 @@ public void testRemoveOrphanFilesActionSupport() throws
InterruptedException {
.location(table.location() + "/metadata")
.olderThan(System.currentTimeMillis())
.execute();
- Assert.assertTrue("Should not delete any metadata files",
Iterables.isEmpty(result1.orphanFileLocations()));
Review Comment:
If we decide to keep the `orphanFileLocations` method without deprecating
it, then we should keep this as is.
##########
core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java:
##########
@@ -19,16 +19,26 @@
package org.apache.iceberg.actions;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
public class BaseDeleteOrphanFilesActionResult implements
DeleteOrphanFiles.Result {
- private final Iterable<String> orphanFileLocations;
+ private final Iterable<DeleteOrphanFiles.OrphanFileStatus>
orphanFileLocations;
Review Comment:
I think the name `orphanFileLocations ` should be renamed to something like
`orphanFileStatus` or `orphanFileStatuses` . This variable no longer represents
orphanFileLocations.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +1040,24 @@ protected long waitUntilAfter(long timestampMillis) {
}
return current;
}
+
+ protected List<String>
toFileLocations(Iterable<DeleteOrphanFiles.OrphanFileStatus> fileStatuses) {
+ return StreamSupport.stream(fileStatuses.spliterator(), false)
Review Comment:
Can we make this `StreamSupport.stream(fileStatuses.spliterator(), true)` to
use a parallel stream
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java:
##########
@@ -309,10 +309,10 @@ public void testIgnoreMetadataFilesNotFound() {
DeleteOrphanFiles.Result result =
sparkActions().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 1 file", 1,
Iterables.size(result.orphanFileLocations()));
+ Assert.assertEquals("Should delete 1 file", 1,
Iterables.size(result.orphanFiles()));
Assert.assertTrue("Should remove v1 file",
- StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
- .anyMatch(file -> file.contains("v1.metadata.json")));
+ StreamSupport.stream(result.orphanFiles().spliterator(), false)
Review Comment:
If we decide to keep the `orphanFileLocations` method without deprecating
it, then we should keep this as is.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -222,14 +222,22 @@ private DeleteOrphanFiles.Result doExecute() {
.as(Encoders.STRING())
.collectAsList();
+ List<DeleteOrphanFiles.OrphanFileStatus> orphanFileStatuses =
Lists.newArrayListWithCapacity(orphanFiles.size());
+
Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file,
exc))
- .run(deleteFunc::accept);
-
- return new BaseDeleteOrphanFilesActionResult(orphanFiles);
+ .onFailure((file, exc) -> {
+ LOG.warn("Failed to delete file: {}", file, exc);
+ orphanFileStatuses.add(new BaseOrphanFileStatus(file, /* deleted */
false, exc));
Review Comment:
nit: maybe remove the comment. its upto you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]