szehon-ho commented on code in PR #5491:
URL: https://github.com/apache/iceberg/pull/5491#discussion_r942859272
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -200,6 +210,113 @@ protected Dataset<Row> loadMetadataTable(Table table,
MetadataTableType type) {
return SparkTableUtil.loadMetadataTable(spark, table, type);
}
+ /**
+ * Deletes files and keeps track of how many files were removed for each
file type.
+ *
+ * @param executorService an executor service to use for parallel deletes
+ * @param deleteFunc a delete func
+ * @param files an iterator of Spark rows of the structure (path: String,
type: String)
+ * @return stats on which files were deleted
+ */
+ protected DeleteSummary deleteFiles(
+ ExecutorService executorService, Consumer<String> deleteFunc,
Iterator<Row> files) {
+
+ DeleteSummary summary = new DeleteSummary();
+
+ Tasks.foreach(files)
+ .retry(DELETE_NUM_RETRIES)
+ .stopRetryOn(NotFoundException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (fileInfo, exc) -> {
+ String path = fileInfo.getString(0);
+ String type = fileInfo.getString(1);
+ LOG.warn("Delete failed for {}: {}", type, path, exc);
+ })
+ .run(
+ fileInfo -> {
+ String path = fileInfo.getString(0);
+ String type = fileInfo.getString(1);
+ deleteFunc.accept(path);
+ summary.deletedFile(path, type);
+ });
+
+ return summary;
+ }
+
+ static class DeleteSummary {
+ private final AtomicLong dataFilesCount = new AtomicLong(0L);
+ private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
+ private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
+ private final AtomicLong manifestsCount = new AtomicLong(0L);
+ private final AtomicLong manifestListsCount = new AtomicLong(0L);
+ private final AtomicLong otherFilesCount = new AtomicLong(0L);
+
+ public void deletedFile(String path, String type) {
+ if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+ dataFilesCount.incrementAndGet();
+ LOG.trace("Deleted data file: {}", path);
+
+ } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+ positionDeleteFilesCount.incrementAndGet();
+ LOG.trace("Deleted positional delete file: {}", path);
+
+ } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+ equalityDeleteFilesCount.incrementAndGet();
+ LOG.trace("Deleted equality delete file: {}", path);
+
+ } else if (MANIFEST.equalsIgnoreCase(type)) {
+ manifestsCount.incrementAndGet();
+ LOG.debug("Deleted manifest: {}", path);
Review Comment:
Hm, i just noticed, these logs are different levels (debug vs trace).
Should we have a follow up to fix it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]