RussellSpitzer commented on code in PR #4451:
URL: https://github.com/apache/iceberg/pull/4451#discussion_r842179036
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -172,18 +173,34 @@ private String jobDesc() {
Column nameEqual = actualFileName.equalTo(validFileName);
Column actualContains =
actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti")
- .as(Encoders.STRING())
- .collectAsList();
-
- Tasks.foreach(orphanFiles)
- .noRetry()
- .executeWith(deleteExecutorService)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file,
exc))
- .run(deleteFunc::accept);
-
- return new BaseDeleteOrphanFilesActionResult(orphanFiles);
+ Dataset<String> orphanDF = actualFileDF.join(validFileDF, joinCond,
"leftanti")
+ .as(Encoders.STRING());
+ return deleteFiles(orphanDF);
+ }
+
+ private BaseDeleteOrphanFilesActionResult deleteFiles(Dataset<String>
orphanDF) {
+ boolean streamResults = PropertyUtil.propertyAsBoolean(options(),
STREAM_RESULTS, false);
+ Iterable<String> itr;
+ if (streamResults) {
+ // cache df to avoid recomputing
+ orphanDF.cache();
+ itr = () -> orphanDF.toLocalIterator();
+ } else {
+ itr = orphanDF.collectAsList();
+ }
+ Tasks.foreach(itr)
+ .noRetry()
+ .executeWith(deleteExecutorService)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}",
file, exc))
+ .run(file -> {
+ deleteFunc.accept(file);
+ });
+ if (streamResults) {
Review Comment:
@microbearz changing that api for that may be have to wait for a version
change ... Your fix would change the situation for the action version. Although
we probably can't cache like we are currently doing, I think this would leave
the uncache for garbage collection of the object on the driver (if it gets
uncached at all) which could be a very long time from the completion of the
action depending on the use case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]