This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new f305d1f25 [AMORO-3608] Using rolling batch files cleaner to reduce
memory usage (#3630)
f305d1f25 is described below
commit f305d1f25273e2b4dea46c1b2385cbd59792b84a
Author: Xu Bai <[email protected]>
AuthorDate: Mon Sep 22 15:15:13 2025 +0800
[AMORO-3608] Using rolling batch files cleaner to reduce memory usage
(#3630)
* [WAP] Refactor expired file handling to delete files in group
* [WAP] Rename ExpiredFileCleaner to RollingFileCleaner and update
references
* Exclude parent directory from file deletion in RollingFileCleaner
* address
* fix
---------
Co-authored-by: ZhouJinsong <[email protected]>
---
.../maintainer/IcebergTableMaintainer.java | 62 +++------
.../amoro/server/utils/RollingFileCleaner.java | 138 +++++++++++++++++++++
.../amoro/server/util/TestRollingFileCleaner.java | 53 ++++++++
.../java/org/apache/amoro/utils/TableFileUtil.java | 3 +
4 files changed, 213 insertions(+), 43 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index a5269a02a..707df4666 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -32,6 +32,7 @@ import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.server.utils.IcebergTableUtil;
+import org.apache.amoro.server.utils.RollingFileCleaner;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -39,7 +40,6 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.utils.TableFileUtil;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
@@ -67,8 +67,8 @@ import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;
-import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +91,6 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -201,54 +200,31 @@ public class IcebergTableMaintainer implements
TableMaintainer {
olderThan,
minCount,
exclude);
- final AtomicInteger toDeleteFiles = new AtomicInteger(0);
- Set<String> parentDirectories = new HashSet<>();
- Set<String> expiredFiles = new HashSet<>();
+ RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(),
exclude);
table
.expireSnapshots()
.retainLast(Math.max(minCount, 1))
.expireOlderThan(olderThan)
- .deleteWith(
- file -> {
- if (exclude.isEmpty()) {
- expiredFiles.add(file);
- } else {
- String fileUriPath = TableFileUtil.getUriPath(file);
- if (!exclude.contains(fileUriPath)
- && !exclude.contains(new
Path(fileUriPath).getParent().toString())) {
- expiredFiles.add(file);
- }
- }
-
- parentDirectories.add(new Path(file).getParent().toString());
- toDeleteFiles.incrementAndGet();
- })
+ .deleteWith(expiredFileCleaner::addFile)
.cleanExpiredFiles(
true) /* enable clean only for collecting the expired files, will
delete them later */
.commit();
- // try to batch delete files
- int deletedFiles =
- TableFileUtil.parallelDeleteFiles(fileIO(), expiredFiles,
ThreadPools.getWorkerPool());
-
- parentDirectories.forEach(
- parent -> {
- try {
- TableFileUtil.deleteEmptyDirectory(fileIO(), parent, exclude);
- } catch (Exception e) {
- // Ignore exceptions to remove as many directories as possible
- LOG.warn("Failed to delete empty directory {} for table {}",
parent, table.name(), e);
- }
- });
-
- runWithCondition(
- toDeleteFiles.get() > 0,
- () ->
- LOG.info(
- "Deleted {}/{} files for table {}",
- deletedFiles,
- toDeleteFiles.get(),
- getTable().name()));
+ int collectedFiles = expiredFileCleaner.fileCount();
+ expiredFileCleaner.clear();
+ if (collectedFiles > 0) {
+ LOG.info(
+ "Expired {}/{} files for table {} order than {}",
+ collectedFiles,
+ expiredFileCleaner.cleanedFileCount(),
+ table.name(),
+ DateTimeUtil.formatTimestampMillis(olderThan));
+ } else {
+ LOG.debug(
+ "No expired files found for table {} order than {}",
+ table.name(),
+ DateTimeUtil.formatTimestampMillis(olderThan));
+ }
}
@Override
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java
new file mode 100644
index 000000000..74a977a7a
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.utils;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RollingFileCleaner {
+ private final Set<String> collectedFiles = Sets.newConcurrentHashSet();
+ private final Set<String> excludeFiles;
+ private final Set<String> parentDirectories = Sets.newConcurrentHashSet();
+
+ private static final int CLEANED_FILE_GROUP_SIZE = 1_000;
+
+ private final AtomicInteger fileCounter = new AtomicInteger(0);
+ private final AtomicInteger cleanedFileCounter = new AtomicInteger(0);
+
+ private final AuthenticatedFileIO fileIO;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RollingFileCleaner.class);
+
+ public RollingFileCleaner(AuthenticatedFileIO fileIO, Set<String>
excludeFiles) {
+ this.fileIO = fileIO;
+ this.excludeFiles =
+ excludeFiles != null
+ ? Sets.newConcurrentHashSet(excludeFiles)
+ : Sets.newConcurrentHashSet();
+ }
+
+ public void addFile(String filePath) {
+ if (isFileExcluded(filePath)) {
+ LOG.debug("File {} is excluded from cleaning", filePath);
+ return;
+ }
+
+ collectedFiles.add(filePath);
+ String parentDir = new
Path(URI.create(filePath).getPath()).getParent().toString();
+ parentDirectories.add(parentDir);
+ int currentCount = fileCounter.incrementAndGet();
+
+ if (currentCount % CLEANED_FILE_GROUP_SIZE == 0) {
+ doCleanFiles();
+ }
+ }
+
+ private boolean isFileExcluded(String filePath) {
+ if (excludeFiles.isEmpty()) {
+ return false;
+ }
+
+ if (excludeFiles.contains(filePath)) {
+ return true;
+ }
+
+ String uriPath = URI.create(filePath).getPath();
+ if (excludeFiles.contains(uriPath)) {
+ return true;
+ }
+
+ String parentPath = new Path(uriPath).getParent().toString();
+ return excludeFiles.contains(parentPath);
+ }
+
+ private void doCleanFiles() {
+ if (collectedFiles.isEmpty()) {
+ return;
+ }
+
+ if (fileIO.supportBulkOperations()) {
+ try {
+ fileIO.asBulkFileIO().deleteFiles(collectedFiles);
+ cleanedFileCounter.addAndGet(collectedFiles.size());
+ } catch (BulkDeletionFailureException e) {
+ LOG.warn("Failed to delete {} expired files in bulk",
e.numberFailedObjects());
+ }
+ } else {
+ for (String filePath : collectedFiles) {
+ try {
+ fileIO.deleteFile(filePath);
+ cleanedFileCounter.incrementAndGet();
+ } catch (Exception e) {
+ LOG.warn("Failed to delete expired file: {}", filePath, e);
+ }
+ }
+ }
+ // Try to delete empty parent directories
+ for (String parentDir : parentDirectories) {
+ TableFileUtil.deleteEmptyDirectory(fileIO, parentDir, excludeFiles);
+ }
+ parentDirectories.clear();
+
+ LOG.debug("Cleaned expired a file group, total files: {}",
collectedFiles.size());
+
+ collectedFiles.clear();
+ }
+
+ public int fileCount() {
+ return fileCounter.get();
+ }
+
+ public int cleanedFileCount() {
+ return cleanedFileCounter.get();
+ }
+
+ public void clear() {
+ if (!collectedFiles.isEmpty()) {
+ doCleanFiles();
+ }
+
+ collectedFiles.clear();
+ excludeFiles.clear();
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
b/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
new file mode 100644
index 000000000..3bc562bc1
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.util;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.io.AuthenticatedFileIOAdapter;
+import org.apache.amoro.server.utils.RollingFileCleaner;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+public class TestRollingFileCleaner {
+
+ @Test
+ void testCleanFiles() {
+ InMemoryFileIO io = new InMemoryFileIO();
+ AuthenticatedFileIO fileIO = new AuthenticatedFileIOAdapter(io);
+ RollingFileCleaner fileCleaner = new RollingFileCleaner(fileIO,
Sets.newHashSet());
+ // generate some files
+ Set<String> expiredFiles = Sets.newHashSet();
+ for (int i = 0; i < 5050; i++) {
+ String filePath = "file_" + i + ".txt";
+ io.addFile(filePath, ("file_content" + i).getBytes());
+ expiredFiles.add(filePath);
+ fileCleaner.addFile(filePath);
+ }
+
+ Assertions.assertEquals(expiredFiles.size(), fileCleaner.fileCount());
+
+ Assertions.assertEquals(5000, fileCleaner.cleanedFileCount());
+ fileCleaner.clear();
+ Assertions.assertEquals(5050, fileCleaner.cleanedFileCount());
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
index ea972067b..62d7ea651 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
@@ -84,6 +84,9 @@ public class TableFileUtil {
*/
public static void deleteEmptyDirectory(
AuthenticatedFileIO io, String directoryPath, Set<String> exclude) {
+ if (directoryPath == null || directoryPath.isEmpty()) {
+ return;
+ }
if (!io.exists(directoryPath)) {
LOG.debug("The target directory {} does not exist or has been deleted",
directoryPath);
return;