This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new f305d1f25 [AMORO-3608] Using rolling batch files cleaner to reduce 
memory usage (#3630)
f305d1f25 is described below

commit f305d1f25273e2b4dea46c1b2385cbd59792b84a
Author: Xu Bai <[email protected]>
AuthorDate: Mon Sep 22 15:15:13 2025 +0800

    [AMORO-3608] Using rolling batch files cleaner to reduce memory usage 
(#3630)
    
    * [WAP] Refactor expired file handling to delete files in group
    
    * [WAP] Rename ExpiredFileCleaner to RollingFileCleaner and update 
references
    
    * Exclude parent directory from file deletion in RollingFileCleaner
    
    * address
    
    * fix
    
    ---------
    
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../maintainer/IcebergTableMaintainer.java         |  62 +++------
 .../amoro/server/utils/RollingFileCleaner.java     | 138 +++++++++++++++++++++
 .../amoro/server/util/TestRollingFileCleaner.java  |  53 ++++++++
 .../java/org/apache/amoro/utils/TableFileUtil.java |   3 +
 4 files changed, 213 insertions(+), 43 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index a5269a02a..707df4666 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -32,6 +32,7 @@ import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableConfigurations;
 import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
 import org.apache.amoro.server.utils.IcebergTableUtil;
+import org.apache.amoro.server.utils.RollingFileCleaner;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -39,7 +40,6 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.utils.TableFileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.ContentScanTask;
 import org.apache.iceberg.DataFile;
@@ -67,8 +67,8 @@ import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.SerializableFunction;
-import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +91,6 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -201,54 +200,31 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         olderThan,
         minCount,
         exclude);
-    final AtomicInteger toDeleteFiles = new AtomicInteger(0);
-    Set<String> parentDirectories = new HashSet<>();
-    Set<String> expiredFiles = new HashSet<>();
+    RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), 
exclude);
     table
         .expireSnapshots()
         .retainLast(Math.max(minCount, 1))
         .expireOlderThan(olderThan)
-        .deleteWith(
-            file -> {
-              if (exclude.isEmpty()) {
-                expiredFiles.add(file);
-              } else {
-                String fileUriPath = TableFileUtil.getUriPath(file);
-                if (!exclude.contains(fileUriPath)
-                    && !exclude.contains(new 
Path(fileUriPath).getParent().toString())) {
-                  expiredFiles.add(file);
-                }
-              }
-
-              parentDirectories.add(new Path(file).getParent().toString());
-              toDeleteFiles.incrementAndGet();
-            })
+        .deleteWith(expiredFileCleaner::addFile)
         .cleanExpiredFiles(
             true) /* enable clean only for collecting the expired files, will 
delete them later */
         .commit();
 
-    // try to batch delete files
-    int deletedFiles =
-        TableFileUtil.parallelDeleteFiles(fileIO(), expiredFiles, 
ThreadPools.getWorkerPool());
-
-    parentDirectories.forEach(
-        parent -> {
-          try {
-            TableFileUtil.deleteEmptyDirectory(fileIO(), parent, exclude);
-          } catch (Exception e) {
-            // Ignore exceptions to remove as many directories as possible
-            LOG.warn("Failed to delete empty directory {} for table {}", 
parent, table.name(), e);
-          }
-        });
-
-    runWithCondition(
-        toDeleteFiles.get() > 0,
-        () ->
-            LOG.info(
-                "Deleted {}/{} files for table {}",
-                deletedFiles,
-                toDeleteFiles.get(),
-                getTable().name()));
+    int collectedFiles = expiredFileCleaner.fileCount();
+    expiredFileCleaner.clear();
+    if (collectedFiles > 0) {
+      LOG.info(
+          "Expired {}/{} files for table {} order than {}",
+          collectedFiles,
+          expiredFileCleaner.cleanedFileCount(),
+          table.name(),
+          DateTimeUtil.formatTimestampMillis(olderThan));
+    } else {
+      LOG.debug(
+          "No expired files found for table {} order than {}",
+          table.name(),
+          DateTimeUtil.formatTimestampMillis(olderThan));
+    }
   }
 
   @Override
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java
new file mode 100644
index 000000000..74a977a7a
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/RollingFileCleaner.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.utils;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RollingFileCleaner {
+  private final Set<String> collectedFiles = Sets.newConcurrentHashSet();
+  private final Set<String> excludeFiles;
+  private final Set<String> parentDirectories = Sets.newConcurrentHashSet();
+
+  private static final int CLEANED_FILE_GROUP_SIZE = 1_000;
+
+  private final AtomicInteger fileCounter = new AtomicInteger(0);
+  private final AtomicInteger cleanedFileCounter = new AtomicInteger(0);
+
+  private final AuthenticatedFileIO fileIO;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RollingFileCleaner.class);
+
+  public RollingFileCleaner(AuthenticatedFileIO fileIO, Set<String> 
excludeFiles) {
+    this.fileIO = fileIO;
+    this.excludeFiles =
+        excludeFiles != null
+            ? Sets.newConcurrentHashSet(excludeFiles)
+            : Sets.newConcurrentHashSet();
+  }
+
+  public void addFile(String filePath) {
+    if (isFileExcluded(filePath)) {
+      LOG.debug("File {} is excluded from cleaning", filePath);
+      return;
+    }
+
+    collectedFiles.add(filePath);
+    String parentDir = new 
Path(URI.create(filePath).getPath()).getParent().toString();
+    parentDirectories.add(parentDir);
+    int currentCount = fileCounter.incrementAndGet();
+
+    if (currentCount % CLEANED_FILE_GROUP_SIZE == 0) {
+      doCleanFiles();
+    }
+  }
+
+  private boolean isFileExcluded(String filePath) {
+    if (excludeFiles.isEmpty()) {
+      return false;
+    }
+
+    if (excludeFiles.contains(filePath)) {
+      return true;
+    }
+
+    String uriPath = URI.create(filePath).getPath();
+    if (excludeFiles.contains(uriPath)) {
+      return true;
+    }
+
+    String parentPath = new Path(uriPath).getParent().toString();
+    return excludeFiles.contains(parentPath);
+  }
+
+  private void doCleanFiles() {
+    if (collectedFiles.isEmpty()) {
+      return;
+    }
+
+    if (fileIO.supportBulkOperations()) {
+      try {
+        fileIO.asBulkFileIO().deleteFiles(collectedFiles);
+        cleanedFileCounter.addAndGet(collectedFiles.size());
+      } catch (BulkDeletionFailureException e) {
+        LOG.warn("Failed to delete {} expired files in bulk", 
e.numberFailedObjects());
+      }
+    } else {
+      for (String filePath : collectedFiles) {
+        try {
+          fileIO.deleteFile(filePath);
+          cleanedFileCounter.incrementAndGet();
+        } catch (Exception e) {
+          LOG.warn("Failed to delete expired file: {}", filePath, e);
+        }
+      }
+    }
+    // Try to delete empty parent directories
+    for (String parentDir : parentDirectories) {
+      TableFileUtil.deleteEmptyDirectory(fileIO, parentDir, excludeFiles);
+    }
+    parentDirectories.clear();
+
+    LOG.debug("Cleaned expired a file group, total files: {}", 
collectedFiles.size());
+
+    collectedFiles.clear();
+  }
+
+  public int fileCount() {
+    return fileCounter.get();
+  }
+
+  public int cleanedFileCount() {
+    return cleanedFileCounter.get();
+  }
+
+  public void clear() {
+    if (!collectedFiles.isEmpty()) {
+      doCleanFiles();
+    }
+
+    collectedFiles.clear();
+    excludeFiles.clear();
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
new file mode 100644
index 000000000..3bc562bc1
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/util/TestRollingFileCleaner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.util;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.io.AuthenticatedFileIOAdapter;
+import org.apache.amoro.server.utils.RollingFileCleaner;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+public class TestRollingFileCleaner {
+
+  @Test
+  void testCleanFiles() {
+    InMemoryFileIO io = new InMemoryFileIO();
+    AuthenticatedFileIO fileIO = new AuthenticatedFileIOAdapter(io);
+    RollingFileCleaner fileCleaner = new RollingFileCleaner(fileIO, 
Sets.newHashSet());
+    // generate some files
+    Set<String> expiredFiles = Sets.newHashSet();
+    for (int i = 0; i < 5050; i++) {
+      String filePath = "file_" + i + ".txt";
+      io.addFile(filePath, ("file_content" + i).getBytes());
+      expiredFiles.add(filePath);
+      fileCleaner.addFile(filePath);
+    }
+
+    Assertions.assertEquals(expiredFiles.size(), fileCleaner.fileCount());
+
+    Assertions.assertEquals(5000, fileCleaner.cleanedFileCount());
+    fileCleaner.clear();
+    Assertions.assertEquals(5050, fileCleaner.cleanedFileCount());
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
index ea972067b..62d7ea651 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
@@ -84,6 +84,9 @@ public class TableFileUtil {
    */
   public static void deleteEmptyDirectory(
       AuthenticatedFileIO io, String directoryPath, Set<String> exclude) {
+    if (directoryPath == null || directoryPath.isEmpty()) {
+      return;
+    }
     if (!io.exists(directoryPath)) {
       LOG.debug("The target directory {} does not exist or has been deleted", 
directoryPath);
       return;

Reply via email to