baiyangtx commented on code in PR #2711:
URL: https://github.com/apache/amoro/pull/2711#discussion_r1559008409
##########
core/src/main/java/com/netease/arctic/utils/TableFileUtil.java:
##########
@@ -80,6 +84,81 @@ public static void deleteEmptyDirectory(
}
}
+ /**
+ * Helper to delete files. Bulk deletion is used if possible.
+ *
+ * @param io arctic file io
+ * @param files files to delete
+ * @param concurrent controls concurrent deletion. Only applicable for
non-bulk FileIO
+ * @return deleted file count
+ */
+ public static int deleteFiles(
+ ArcticFileIO io, Set<String> files, boolean concurrent, ExecutorService
svc) {
Review Comment:
I think the method signature could be this.
```suggestion
public static int deleteFiles(
ArcticFileIO io, Set<String> files, ExecutorService workpool) {
```
##########
core/src/main/java/com/netease/arctic/utils/TableFileUtil.java:
##########
@@ -80,6 +84,81 @@ public static void deleteEmptyDirectory(
}
}
+ /**
+ * Helper to delete files. Bulk deletion is used if possible.
+ *
+ * @param io arctic file io
+ * @param files files to delete
+ * @param concurrent controls concurrent deletion. Only applicable for
non-bulk FileIO
+ * @return deleted file count
+ */
+ public static int deleteFiles(
+ ArcticFileIO io, Set<String> files, boolean concurrent, ExecutorService
svc) {
+ if (files == null || files.isEmpty()) {
+ return 0;
+ }
+
+ AtomicInteger failedFileCnt = new AtomicInteger(0);
+ if (io.supportBulkOperations()) {
+ try {
+ io.asBulkFileIO().deleteFiles(files);
+ } catch (BulkDeletionFailureException e) {
+ failedFileCnt.set(e.numberFailedObjects());
+ LOG.warn("Failed to bulk delete {} files", e.numberFailedObjects(), e);
+ } catch (RuntimeException e) {
+ failedFileCnt.set(files.size());
+ LOG.warn("Failed to bulk delete files", e);
+ }
+ } else {
+ if (concurrent) {
Review Comment:
That's it
```suggestion
if (workpool== null) {
```
##########
ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -936,4 +971,12 @@ public Literal<Long> getTsBound() {
return tsBound;
}
}
+
+ private void runWithCondition(boolean condition, Runnable fun, Runnable
other) {
+ if (condition) {
+ fun.run();
+ } else {
+ other.run();
+ }
+ }
Review Comment:
It seems that for all caller, the `other` is always `() -> {}` .
##########
ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -352,10 +368,18 @@ private int clearInternalTableContentsFiles(long
lastTime, Set<String> exclude)
// dir.
if (io.supportFileSystemOperations()) {
SupportsFileSystemOperations fio = io.asFileSystemIO();
- return deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude);
+ Set<PathInfo> directories = new HashSet<>();
+ Set<String> filesToDelete =
+ deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude,
directories);
+ int deleted = TableFileUtil.deleteFiles(io, filesToDelete);
+ /* delete empty directories */
+ deleteEmptyDirectories(fio, directories, lastTime, exclude);
+ return deleted;
Review Comment:
I means that we should print log like this.
```
runWithCondition(
filesToDelete.size() > 0 ,
() -> LOG.info("{}: There are {} files need to be deleted, and delete {}
files successful.", table.name(), filesToDelete.size(), deleted));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]