RussellSpitzer commented on a change in pull request #4451:
URL: https://github.com/apache/iceberg/pull/4451#discussion_r841088711



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
##########
@@ -172,18 +173,34 @@ private String jobDesc() {
     Column nameEqual = actualFileName.equalTo(validFileName);
     Column actualContains = 
actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
     Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, 
"leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
-
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
-
-    return new BaseDeleteOrphanFilesActionResult(orphanFiles);
+    Dataset<String> orphanDF = actualFileDF.join(validFileDF, joinCond, 
"leftanti")
+            .as(Encoders.STRING());
+    return deleteFiles(orphanDF);
+  }
+
+  private BaseDeleteOrphanFilesActionResult deleteFiles(Dataset<String> 
orphanDF) {
+    boolean streamResults = PropertyUtil.propertyAsBoolean(options(), 
STREAM_RESULTS, false);
+    Iterable<String> itr;
+    if (streamResults) {
+      // cache df to avoid recomputing
+      orphanDF.cache();
+      itr = () -> orphanDF.toLocalIterator();
+    } else {
+      itr = orphanDF.collectAsList();
+    }
+    Tasks.foreach(itr)
+            .noRetry()
+            .executeWith(deleteExecutorService)
+            .suppressFailureWhenFinished()
+            .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", 
file, exc))
+            .run(file -> {
+              deleteFunc.accept(file);
+            });
+    if (streamResults) {

Review comment:
       This may end up being very expensive since the code in the Procedure use 
this iterable a bunch of times. It also ends up putting everything in memory 
when it build the output rows ... InternalRow[] :/
   
   
   One thing I think we should still be thinking about is moving all the delete 
operations to the executors. I know we had a little chat about this on Iceberg 
Slack and I think it's worth thinking about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to