amogh-jahagirdar commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1089790123
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
private static final int DELETE_NUM_RETRIES = 3;
+ private static final int DELETE_GROUP_SIZE = 100000;
Review Comment:
Is it feasible to make this value configurable through the options passed in
to the action?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -98,6 +101,12 @@ public InternalRow[] call(InternalRow args) {
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
+ if (maxConcurrentDeletes != null) {
+ LOG.warn(
+ "{} is now deprecated, parallelism should now be configured in the
FileIO bulk operations. Check the"
+ + "configured FileIO for more information",
+ PARAMETERS[4].name());
+ }
Review Comment:
Style nit, new line after the if block
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -265,7 +285,15 @@ private Set<Long> findExpiredSnapshotIds(
}
private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
- DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc,
files);
+ DeleteSummary summary;
+ if (ops.io() instanceof SupportsBulkOperations) {
+ LOG.info("Triggering Bulk Delete Operations");
+ summary = deleteFiles(bulkDeleteFunc, files);
+ } else {
+ LOG.warn("Warning falling back to non-bulk deletes");
Review Comment:
Nit on the warn log, I think we should make it more clear why it's a warning
so users don't think anything bad is happening:
"Falling back to non-bulk deletes. Bulk deletes are recommended for better
deletion throughput"
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,45 @@ public void deletePrefix(String prefix) {
}
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(3)
+ .stopRetryOn(FileNotFoundException.class)
Review Comment:
Perhaps this is out of scope for this PR since for other delete operations
we only stop retry on FileNotFoundException, but it seems there are other cases
where we should stop retrying like permission denied errors.
https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L2848
or AccessControlExceptions in Hadoop cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]