baiyangtx commented on code in PR #2711:
URL: https://github.com/apache/incubator-amoro/pull/2711#discussion_r1554847422
##########
ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -495,28 +509,31 @@ private static int deleteInvalidFilesInFs(
} else {
String parentLocation = TableFileUtil.getParent(p.location());
String parentUriPath = TableFileUtil.getUriPath(parentLocation);
+ Set<String> filesToDelete = new HashSet<>();
if (!excludes.contains(uriPath)
&& !excludes.contains(parentUriPath)
&& p.createdAtMillis() < lastTime) {
- fio.deleteFile(p.location());
- deleteCount += 1;
+ filesToDelete.add(p.location());
}
+ int deletedCnt =
+ TableFileUtil.deleteFiles(table.name(), arcticFileIO(),
filesToDelete, false);
Review Comment:
The `SupportsFileSystemOperations` is an `ArcticFileIO` so that you can use
`fio` directly.
And in the `TableFileUtil.deleteFiles`, `tableName` is only used for
logging, is it nessary to pass that parameter in? So that we shouldn't have to
change the method to non-static.
##########
core/src/main/java/com/netease/arctic/utils/TableFileUtil.java:
##########
@@ -80,6 +84,56 @@ public static void deleteEmptyDirectory(
}
}
+ /**
+ * Helper to delete files. Bulk deletion is used if possible.
+ *
+ * @param tableName table name
+ * @param io arctic file io
+ * @param files files to delete
+ * @param concurrent controls concurrent deletion. Only applicable for
non-bulk FileIO
+ * @return deleted file count
+ */
+ public static int deleteFiles(
+ String tableName, ArcticFileIO io, Set<String> files, boolean
concurrent) {
+ AtomicInteger failedFileCnt = new AtomicInteger(0);
+ if (io.supportBulkOperations()) {
+ try {
+ io.asBulkFileIO().deleteFiles(files);
+ } catch (BulkDeletionFailureException e) {
+ failedFileCnt.set(e.numberFailedObjects());
+ LOG.warn("Failed to bulk delete {} files in {}",
e.numberFailedObjects(), tableName);
+ } catch (RuntimeException e) {
+ failedFileCnt.set(files.size());
+ LOG.warn("Failed to bulk delete files in {}", tableName, e);
+ }
+ } else {
+ if (concurrent) {
+ Tasks.foreach(files)
+ .executeWith(ThreadPools.getWorkerPool())
Review Comment:
Why not pass the workpool as a parameter to this tool method?
Maintaining a thread pool by the caller of a tool method is better.
##########
ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -297,21 +300,32 @@ protected void cleanContentFiles(long lastTime) {
// so acquire in advance
// to prevent repeated acquisition
Set<String> validFiles = orphanFileCleanNeedToExcludeFiles();
- LOG.info("{} start clean content files of change store", table.name());
+ LOG.info("{} start cleaning orphan files in content", table.name());
int deleteFilesCnt = clearInternalTableContentsFiles(lastTime, validFiles);
- LOG.info("{} total delete {} files from change store", table.name(),
deleteFilesCnt);
+ runWithCondition(
+ deleteFilesCnt > 0,
+ () -> LOG.info("{} total delete {} orphan files in content",
table.name(), deleteFilesCnt),
+ () -> LOG.info("{} doesn't have orphan files in content",
table.name()));
Review Comment:
I don't think it's necessary to print different logs depending on the number
of files that are successfully deleted. A consistent log format is more
conducive to troubleshooting
##########
ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -495,28 +509,31 @@ private static int deleteInvalidFilesInFs(
} else {
String parentLocation = TableFileUtil.getParent(p.location());
String parentUriPath = TableFileUtil.getUriPath(parentLocation);
+ Set<String> filesToDelete = new HashSet<>();
if (!excludes.contains(uriPath)
&& !excludes.contains(parentUriPath)
&& p.createdAtMillis() < lastTime) {
- fio.deleteFile(p.location());
- deleteCount += 1;
+ filesToDelete.add(p.location());
}
+ int deletedCnt =
+ TableFileUtil.deleteFiles(table.name(), arcticFileIO(),
filesToDelete, false);
Review Comment:
Or you can make the `deleteInvalidFilesInFs` and
`deleteInvalidFilesByPrefix` return a set of FilePath need to be deleted, and
delete them in `clearInternalTableContentsFiles` by call
`TableFileUtil.deleteFiles`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]