This is an automated email from the ASF dual-hosted git repository.

zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5d3d84c [AMORO-4216] Refactor dangling-delete-files-cleaning via 
ProcessFactory plugin (#4214)
0c5d3d84c is described below

commit 0c5d3d84c6ba544adb10bdd4d4893de489e3dd4b
Author: WenLingzhang <[email protected]>
AuthorDate: Tue May 19 10:24:53 2026 +0800

    [AMORO-4216] Refactor dangling-delete-files-cleaning via ProcessFactory 
plugin (#4214)
    
    * Refactor dangling-delete-files-cleaning via ProcessFactory plugin
    
    # Conflicts:
    #       
amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
    #       
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
    #       
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
    
    * Fix IcebergActions init failure by increasing MAX_NAME_LENGTH to 32
    
    * rename action to clean-dangling-delete-files to fix pool tag mismatch
    
    * Fix: LocalProcess exceptions were swallowed, preventing state from being 
updated to FAILED
    
    Problem Analysis:
    - In DanglingDeleteFilesCleaningProcess, SnapshotsExpiringProcess, and 
OrphanFilesCleaningProcess, exceptions in the run() method were caught but not 
re-thrown
    - This caused LocalExecutionEngine.ProcessHolder.onComplete() to never 
detect failures
    - Process status was always set to SUCCESS even when execution actually 
failed
    
    Fix:
    - Add  in the catch block to re-throw exceptions
    - Keep existing logging for troubleshooting
    - Ensure exceptions properly propagate to ProcessHolder so status is 
correctly updated to FAILED
    
    Modified Files:
    - amoro-ams/src/main/java/.../DanglingDeleteFilesCleaningProcess.java
    - amoro-ams/src/main/java/.../SnapshotsExpiringProcess.java
    - amoro-ams/src/main/java/.../OrphanFilesCleaningProcess.java
    
    * Fix logging message for dangling delete files error
    
    * Fix formatting of class documentation comment
    
    * fixup style
    
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   | 18 -----
 .../apache/amoro/server/AmoroServiceContainer.java |  1 -
 ...ava => DanglingDeleteFilesCleaningProcess.java} | 21 +++--
 .../process/iceberg/IcebergProcessFactory.java     | 34 ++++++++
 .../iceberg/OrphanFilesCleaningProcess.java        |  1 +
 .../process/iceberg/SnapshotsExpiringProcess.java  |  3 +-
 .../DanglingDeleteFilesCleaningExecutor.java       | 92 ----------------------
 .../scheduler/inline/InlineTableExecutors.java     | 12 ---
 .../amoro/server/table/DefaultTableRuntime.java    |  5 --
 .../server/table/cleanup/CleanupOperation.java     |  1 -
 .../table/cleanup/TableRuntimeCleanupState.java    |  4 +-
 .../process/iceberg/TestIcebergProcessFactory.java | 18 +++++
 .../inline/PeriodicTableSchedulerTestBase.java     |  3 -
 .../inline/TestConfigurableIntervalExecutors.java  | 39 +--------
 .../inline/TestPeriodicTableSchedulerCleanup.java  | 15 +---
 .../src/main/java/org/apache/amoro/Action.java     |  2 +-
 .../main/java/org/apache/amoro/IcebergActions.java |  1 +
 .../amoro/process/TestLocalExecutionEngine.java    |  2 +
 charts/amoro/templates/amoro-configmap.yaml        |  4 -
 dist/src/main/amoro-bin/conf/config.yaml           |  4 -
 .../amoro-bin/conf/plugins/execute-engines.yaml    |  1 +
 .../amoro-bin/conf/plugins/process-factories.yaml  |  2 +
 docs/admin-guides/deployment.md                    |  3 +
 docs/configuration/ams-config.md                   |  3 -
 24 files changed, 85 insertions(+), 204 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 49eade550..759b76700 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -174,24 +174,6 @@ public class AmoroManagementConf {
           .defaultValue(Duration.ofHours(1))
           .withDescription("Interval for expiring snapshots.");
 
-  public static final ConfigOption<Boolean> 
CLEAN_DANGLING_DELETE_FILES_ENABLED =
-      ConfigOptions.key("clean-dangling-delete-files.enabled")
-          .booleanType()
-          .defaultValue(true)
-          .withDescription("Enable dangling delete files cleaning.");
-
-  public static final ConfigOption<Integer> 
CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT =
-      ConfigOptions.key("clean-dangling-delete-files.thread-count")
-          .intType()
-          .defaultValue(10)
-          .withDescription("The number of threads used for dangling delete 
files cleaning.");
-
-  public static final ConfigOption<Duration> 
CLEAN_DANGLING_DELETE_FILES_INTERVAL =
-      ConfigOptions.key("clean-dangling-delete-files.interval")
-          .durationType()
-          .defaultValue(Duration.ofDays(1))
-          .withDescription("Interval for cleaning dangling delete files.");
-
   public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
       ConfigOptions.key("sync-hive-tables.enabled")
           .booleanType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index d174e559e..27615ab31 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,7 +290,6 @@ public class AmoroServiceContainer {
     addHandlerChain(optimizingService.getTableRuntimeHandler());
     addHandlerChain(processService.getTableHandlerChain());
     
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
-    
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
similarity index 74%
copy from 
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
copy to 
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
index 22e4c05ee..5c5fef19c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
@@ -34,12 +34,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-/** Local table process for cleaning Iceberg orphan files. */
-public class OrphanFilesCleaningProcess extends TableProcess implements 
LocalProcess {
+/** Local table process for expiring Iceberg dangling delete files. */
+public class DanglingDeleteFilesCleaningProcess extends TableProcess 
implements LocalProcess {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(OrphanFilesCleaningProcess.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DanglingDeleteFilesCleaningProcess.class);
 
-  public OrphanFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine 
engine) {
+  public DanglingDeleteFilesCleaningProcess(TableRuntime tableRuntime, 
ExecuteEngine engine) {
     super(tableRuntime, engine);
   }
 
@@ -53,18 +54,22 @@ public class OrphanFilesCleaningProcess extends 
TableProcess implements LocalPro
     try {
       AmoroTable<?> amoroTable = tableRuntime.loadTable();
       TableMaintainer tableMaintainer = 
TableMaintainerFactory.create(amoroTable, tableRuntime);
-      tableMaintainer.cleanOrphanFiles();
+      tableMaintainer.cleanDanglingDeleteFiles();
       tableRuntime.updateState(
           DefaultTableRuntime.CLEANUP_STATE_KEY,
-          cleanUp -> 
cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis()));
+          cleanUp -> 
cleanUp.setLastDanglingDeleteFilesCleanTime(System.currentTimeMillis()));
     } catch (Throwable t) {
-      LOG.error("Failed to clean orphan files for table {}", 
tableRuntime.getTableIdentifier(), t);
+      LOG.error(
+          "unexpected dangling delete files cleaning error of table {}",
+          tableRuntime.getTableIdentifier(),
+          t);
+      throw new RuntimeException(t);
     }
   }
 
   @Override
   public Action getAction() {
-    return IcebergActions.DELETE_ORPHANS;
+    return IcebergActions.CLEAN_DANGLING_DELETE;
   }
 
   @Override
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 1f7813476..813892595 100755
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -65,6 +65,14 @@ public class IcebergProcessFactory implements ProcessFactory 
{
           .durationType()
           .defaultValue(Duration.ofDays(1));
 
+  public static final ConfigOption<Boolean> 
DANGLING_DELETE_FILES_CLEANING_ENABLED =
+      
ConfigOptions.key("clean-dangling-delete-files.enabled").booleanType().defaultValue(true);
+
+  public static final ConfigOption<Duration> 
DANGLING_DELETE_FILES_CLEANING_INTERVAL =
+      ConfigOptions.key("clean-dangling-delete-files.interval")
+          .durationType()
+          .defaultValue(Duration.ofDays(1));
+
   private ExecuteEngine localEngine;
   private final Map<Action, ProcessTriggerStrategy> actions = 
Maps.newHashMap();
   private final List<TableFormat> formats =
@@ -101,6 +109,8 @@ public class IcebergProcessFactory implements 
ProcessFactory {
       return triggerExpireSnapshot(tableRuntime);
     } else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
       return triggerCleanOrphans(tableRuntime);
+    } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) {
+      return triggerCleanDanglingDelete(tableRuntime);
     }
 
     return Optional.empty();
@@ -130,6 +140,12 @@ public class IcebergProcessFactory implements 
ProcessFactory {
       this.actions.put(
           IcebergActions.DELETE_ORPHANS, 
ProcessTriggerStrategy.triggerAtFixRate(interval));
     }
+
+    if (configs.getBoolean(DANGLING_DELETE_FILES_CLEANING_ENABLED)) {
+      Duration interval = 
configs.getDuration(DANGLING_DELETE_FILES_CLEANING_INTERVAL);
+      this.actions.put(
+          IcebergActions.CLEAN_DANGLING_DELETE, 
ProcessTriggerStrategy.triggerAtFixRate(interval));
+    }
   }
 
   private Optional<TableProcess> triggerExpireSnapshot(TableRuntime 
tableRuntime) {
@@ -162,6 +178,24 @@ public class IcebergProcessFactory implements 
ProcessFactory {
     return Optional.of(new OrphanFilesCleaningProcess(tableRuntime, 
localEngine));
   }
 
+  private Optional<TableProcess> triggerCleanDanglingDelete(TableRuntime 
tableRuntime) {
+    if (localEngine == null
+        || 
!tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled()) {
+      return Optional.empty();
+    }
+
+    long lastExecuteTime =
+        tableRuntime
+            .getState(DefaultTableRuntime.CLEANUP_STATE_KEY)
+            .getLastDanglingDeleteFilesCleanTime();
+    ProcessTriggerStrategy strategy = 
actions.get(IcebergActions.CLEAN_DANGLING_DELETE);
+    if (System.currentTimeMillis() - lastExecuteTime < 
strategy.getTriggerInterval().toMillis()) {
+      return Optional.empty();
+    }
+
+    return Optional.of(new DanglingDeleteFilesCleaningProcess(tableRuntime, 
localEngine));
+  }
+
   @Override
   public void close() {}
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
index 22e4c05ee..b17f44f2c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
@@ -59,6 +59,7 @@ public class OrphanFilesCleaningProcess extends TableProcess 
implements LocalPro
           cleanUp -> 
cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis()));
     } catch (Throwable t) {
       LOG.error("Failed to clean orphan files for table {}", 
tableRuntime.getTableIdentifier(), t);
+      throw new RuntimeException(t);
     }
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
index 3aea51d9f..e44e8b2ca 100755
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
@@ -58,7 +58,8 @@ public class SnapshotsExpiringProcess extends TableProcess 
implements LocalProce
           DefaultTableRuntime.CLEANUP_STATE_KEY,
           cleanUp -> 
cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
     } catch (Throwable t) {
-      LOG.error("unexpected expire error of table {} ", 
tableRuntime.getTableIdentifier(), t);
+      LOG.error("unexpected expire error of table {}", 
tableRuntime.getTableIdentifier(), t);
+      throw new RuntimeException(t);
     }
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
deleted file mode 100644
index 3ad669092..000000000
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.maintainer.TableMaintainer;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ThreadLocalRandom;
-
-/** Clean table dangling delete files */
-public class DanglingDeleteFilesCleaningExecutor extends 
PeriodicTableScheduler {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DanglingDeleteFilesCleaningExecutor.class);
-
-  private final Duration interval;
-
-  protected DanglingDeleteFilesCleaningExecutor(
-      TableService tableService, int poolSize, Duration interval) {
-    super(tableService, poolSize);
-    this.interval = interval;
-  }
-
-  @Override
-  protected long getNextExecutingTime(TableRuntime tableRuntime) {
-    return interval.toMillis();
-  }
-
-  @Override
-  protected boolean shouldExecute(Long lastCleanupEndTime) {
-    return System.currentTimeMillis() - lastCleanupEndTime >= 
interval.toMillis();
-  }
-
-  @Override
-  protected CleanupOperation getCleanupOperation() {
-    return CleanupOperation.DANGLING_DELETE_FILES_CLEANING;
-  }
-
-  @Override
-  protected boolean enabled(TableRuntime tableRuntime) {
-    return tableRuntime instanceof DefaultTableRuntime
-        && 
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
-  }
-
-  @Override
-  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
-    scheduleIfNecessary(tableRuntime, getStartDelay());
-  }
-
-  @Override
-  protected long getExecutorDelay() {
-    return ThreadLocalRandom.current().nextLong(interval.toMillis());
-  }
-
-  @Override
-  protected void execute(TableRuntime tableRuntime) {
-    try {
-      LOG.info("{} start cleaning dangling delete files", 
tableRuntime.getTableIdentifier());
-      AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
-      tableMaintainer.cleanDanglingDeleteFiles();
-    } catch (Throwable t) {
-      LOG.error("{} failed to clean dangling delete file", 
tableRuntime.getTableIdentifier(), t);
-    }
-  }
-}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 4e5b4beb3..4bb45f73a 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -28,7 +28,6 @@ public class InlineTableExecutors {
 
   private static final InlineTableExecutors instance = new 
InlineTableExecutors();
   private TableRuntimeRefreshExecutor tableRefreshingExecutor;
-  private DanglingDeleteFilesCleaningExecutor 
danglingDeleteFilesCleaningExecutor;
   private BlockerExpiringExecutor blockerExpiringExecutor;
   private OptimizingCommitExecutor optimizingCommitExecutor;
   private ProcessDataExpiringExecutor processDataExpiringExecutor;
@@ -41,13 +40,6 @@ public class InlineTableExecutors {
   }
 
   public void setup(TableService tableService, Configurations conf) {
-    if 
(conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) {
-      this.danglingDeleteFilesCleaningExecutor =
-          new DanglingDeleteFilesCleaningExecutor(
-              tableService,
-              
conf.getInteger(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT),
-              
conf.get(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_INTERVAL));
-    }
     this.optimizingCommitExecutor =
         new OptimizingCommitExecutor(
             tableService, 
conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
@@ -100,10 +92,6 @@ public class InlineTableExecutors {
     return tableRefreshingExecutor;
   }
 
-  public DanglingDeleteFilesCleaningExecutor 
getDanglingDeleteFilesCleaningExecutor() {
-    return danglingDeleteFilesCleaningExecutor;
-  }
-
   public BlockerExpiringExecutor getBlockerExpiringExecutor() {
     return blockerExpiringExecutor;
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index f4a4ef73e..1c6dd25f8 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -342,8 +342,6 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
   public long getLastCleanTime(CleanupOperation operation) {
     TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
     switch (operation) {
-      case DANGLING_DELETE_FILES_CLEANING:
-        return state.getLastDanglingDeleteFilesCleanTime();
       case DATA_EXPIRING:
         return state.getLastDataExpiringTime();
       case SNAPSHOTS_EXPIRING:
@@ -360,9 +358,6 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
             CLEANUP_STATE_KEY,
             state -> {
               switch (operation) {
-                case DANGLING_DELETE_FILES_CLEANING:
-                  state.setLastDanglingDeleteFilesCleanTime(time);
-                  break;
                 case DATA_EXPIRING:
                   state.setLastDataExpiringTime(time);
                   break;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
index b6597db82..478d607f5 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -20,7 +20,6 @@ package org.apache.amoro.server.table.cleanup;
 
 /** Table cleanup operation enum. Defines different operation types for table 
cleanup tasks. */
 public enum CleanupOperation {
-  DANGLING_DELETE_FILES_CLEANING,
   DATA_EXPIRING,
   SNAPSHOTS_EXPIRING,
   //  NONE indicates operation types where no cleanup process records are
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index 639506ea9..21f112584 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -37,8 +37,10 @@ public class TableRuntimeCleanupState {
     return lastDanglingDeleteFilesCleanTime;
   }
 
-  public void setLastDanglingDeleteFilesCleanTime(long 
lastDanglingDeleteFilesCleanTime) {
+  public TableRuntimeCleanupState setLastDanglingDeleteFilesCleanTime(
+      long lastDanglingDeleteFilesCleanTime) {
     this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime;
+    return this;
   }
 
   public long getLastDataExpiringTime() {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 61b00b833..825410296 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -46,6 +46,8 @@ public class TestIcebergProcessFactory {
     assertSupportedAction("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, 
Duration.ofHours(1));
     assertSupportedAction(
         "clean-orphan-files", IcebergActions.DELETE_ORPHANS, 
Duration.ofHours(24));
+    assertSupportedAction(
+        "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, 
Duration.ofHours(24));
   }
 
   @Test
@@ -54,6 +56,11 @@ public class TestIcebergProcessFactory {
         "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, 
SnapshotsExpiringProcess.class, 0);
     assertTriggerWhenDue(
         "clean-orphan-files", IcebergActions.DELETE_ORPHANS, 
OrphanFilesCleaningProcess.class, 0);
+    assertTriggerWhenDue(
+        "clean-dangling-delete-files",
+        IcebergActions.CLEAN_DANGLING_DELETE,
+        DanglingDeleteFilesCleaningProcess.class,
+        0);
   }
 
   @Test
@@ -62,12 +69,18 @@ public class TestIcebergProcessFactory {
         "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, 
System.currentTimeMillis());
     assertTriggerNotDue(
         "clean-orphan-files", IcebergActions.DELETE_ORPHANS, 
System.currentTimeMillis());
+    assertTriggerNotDue(
+        "clean-dangling-delete-files",
+        IcebergActions.CLEAN_DANGLING_DELETE,
+        System.currentTimeMillis());
   }
 
   @Test
   public void testTriggerActionDisabled() {
     assertTriggerDisabled("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, 
false, 0);
     assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS, 
false, 0);
+    assertTriggerDisabled(
+        "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, 
false, 0);
   }
 
   private void assertSupportedAction(
@@ -153,6 +166,8 @@ public class TestIcebergProcessFactory {
       tableConfiguration.setExpireSnapshotEnabled(enabled);
     } else if ("clean-orphan-files".equals(configKey)) {
       tableConfiguration.setCleanOrphanEnabled(enabled);
+    } else if ("clean-dangling-delete-files".equals(configKey)) {
+      tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled);
     }
 
     TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState();
@@ -160,11 +175,14 @@ public class TestIcebergProcessFactory {
       cleanupState.setLastSnapshotsExpiringTime(lastTime);
     } else if ("clean-orphan-files".equals(configKey)) {
       cleanupState.setLastOrphanFilesCleanTime(lastTime);
+    } else if ("clean-dangling-delete-files".equals(configKey)) {
+      cleanupState.setLastDanglingDeleteFilesCleanTime(lastTime);
     }
 
     TableRuntime runtime = mock(TableRuntime.class);
     doReturn(tableConfiguration).when(runtime).getTableConfiguration();
     
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
     return runtime;
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
index ba2a8767c..f1c13a666 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends 
PeriodicTableScheduler {
   private final CleanupOperation cleanupOperation;
   private final boolean enabled;
   private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 
1 hour
-  private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 
* 60 * 1000L;
   private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
 
   public PeriodicTableSchedulerTestBase(
@@ -72,8 +71,6 @@ class PeriodicTableSchedulerTestBase extends 
PeriodicTableScheduler {
     switch (cleanupOperation) {
       case SNAPSHOTS_EXPIRING:
         return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
-      case DANGLING_DELETE_FILES_CLEANING:
-        return currentTime - lastCleanupEndTime >= 
DANGLING_DELETE_FILES_CLEANING_INTERVAL;
       case DATA_EXPIRING:
         return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL;
       default:
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
index 5b50245b6..6c8e05142 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -25,46 +25,9 @@ import org.mockito.Mockito;
 
 import java.time.Duration;
 
-/**
- * Tests for configurable interval in DanglingDeleteFilesCleaningExecutor and
- * SnapshotsExpiringExecutor.
- */
+/** Tests for configurable interval in SnapshotsExpiringExecutor and 
ProcessDataExpiringExecutor. */
 public class TestConfigurableIntervalExecutors {
 
-  @Test
-  public void testDanglingDeleteFilesDefaultInterval() {
-    Duration interval = Duration.ofDays(1);
-    DanglingDeleteFilesCleaningExecutor executor =
-        new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(Duration.ofDays(1).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-
-  @Test
-  public void testDanglingDeleteFilesCustomInterval() {
-    Duration interval = Duration.ofHours(12);
-    DanglingDeleteFilesCleaningExecutor executor =
-        new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(
-        Duration.ofHours(12).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-
-  @Test
-  public void testDanglingDeleteFilesShouldExecuteAfterInterval() {
-    Duration interval = Duration.ofHours(6);
-    DanglingDeleteFilesCleaningExecutor executor =
-        new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
-    long now = System.currentTimeMillis();
-    // 7 hours ago - should execute
-    Assert.assertTrue(executor.shouldExecute(now - 
Duration.ofHours(7).toMillis()));
-    // 5 hours ago - should not execute
-    Assert.assertFalse(executor.shouldExecute(now - 
Duration.ofHours(5).toMillis()));
-  }
-
   @Test
   public void testSnapshotsExpiringDefaultInterval() {
     Duration interval = Duration.ofHours(1);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index fef2a3bd4..7bee81602 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -166,10 +166,7 @@ public class TestPeriodicTableSchedulerCleanup extends 
PersistentBase {
   @Test
   public void testShouldExecuteTaskWithNoPreviousCleanup() {
     List<CleanupOperation> operations =
-        Arrays.asList(
-            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
-            CleanupOperation.DATA_EXPIRING,
-            CleanupOperation.SNAPSHOTS_EXPIRING);
+        Arrays.asList(CleanupOperation.DATA_EXPIRING, 
CleanupOperation.SNAPSHOTS_EXPIRING);
 
     for (CleanupOperation operation : operations) {
       List<Long> testTableIds = Collections.singletonList(1L);
@@ -190,10 +187,7 @@ public class TestPeriodicTableSchedulerCleanup extends 
PersistentBase {
   @Test
   public void testShouldNotExecuteTaskWithRecentCleanup() {
     List<CleanupOperation> operations =
-        Arrays.asList(
-            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
-            CleanupOperation.DATA_EXPIRING,
-            CleanupOperation.SNAPSHOTS_EXPIRING);
+        Arrays.asList(CleanupOperation.DATA_EXPIRING, 
CleanupOperation.SNAPSHOTS_EXPIRING);
 
     for (CleanupOperation operation : operations) {
       List<Long> testTableIds = Collections.singletonList(1L);
@@ -219,10 +213,7 @@ public class TestPeriodicTableSchedulerCleanup extends 
PersistentBase {
   @Test
   public void testShouldExecuteTaskWithOldCleanup() {
     List<CleanupOperation> operations =
-        Arrays.asList(
-            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
-            CleanupOperation.DATA_EXPIRING,
-            CleanupOperation.SNAPSHOTS_EXPIRING);
+        Arrays.asList(CleanupOperation.DATA_EXPIRING, 
CleanupOperation.SNAPSHOTS_EXPIRING);
 
     for (CleanupOperation operation : operations) {
       List<Long> testTableIds = Collections.singletonList(1L);
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java 
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 1d5f81b3b..826006a31 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public final class Action {
 
-  private static final int MAX_NAME_LENGTH = 16;
+  private static final int MAX_NAME_LENGTH = 32;
   private static final Map<String, Action> registeredActions = new 
ConcurrentHashMap<>();
 
   private final String name;
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java 
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index 7b8319260..da1791e93 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -29,4 +29,5 @@ public class IcebergActions {
   public static final Action SYNC_HIVE = Action.register("sync-hive");
   public static final Action EXPIRE_DATA = Action.register("expire-data");
   public static final Action EXPIRE_SNAPSHOTS = 
Action.register("expire-snapshots");
+  public static final Action CLEAN_DANGLING_DELETE = 
Action.register("clean-dangling-delete-files");
 }
diff --git 
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index ed1b26d53..31a9331ee 100644
--- 
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++ 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -47,6 +47,7 @@ public class TestLocalExecutionEngine {
   public void testSubmitUsesCustomPoolByTag() throws Exception {
     assertCustomPoolByTag("snapshots-expiring");
     assertCustomPoolByTag("orphan-files-cleaning");
+    assertCustomPoolByTag("clean-dangling-delete-files");
   }
 
   private void assertCustomPoolByTag(String tag) throws Exception {
@@ -154,6 +155,7 @@ public class TestLocalExecutionEngine {
     properties.put("pool.default.thread-count", "1");
     properties.put("pool.snapshots-expiring.thread-count", "1");
     properties.put("pool.orphan-files-cleaning.thread-count", "1");
+    properties.put("pool.clean-dangling-delete-files.thread-count", "1");
     properties.put("process.status.ttl", ttl);
     localEngine.open(properties);
     return localEngine;
diff --git a/charts/amoro/templates/amoro-configmap.yaml 
b/charts/amoro/templates/amoro-configmap.yaml
index d00566f2c..b11d2bed3 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -103,10 +103,6 @@ data:
         enabled: true
         thread-count: 10
 
-      clean-dangling-delete-files:
-        enabled: true
-        thread-count: 10
-
       sync-hive-tables:
         enabled: false
         thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/config.yaml 
b/dist/src/main/amoro-bin/conf/config.yaml
index e5fc2654a..e4e9399f2 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -112,10 +112,6 @@ ams:
     enabled: true
     thread-count: 10
 
-  clean-dangling-delete-files:
-    enabled: true
-    thread-count: 10
-
   sync-hive-tables:
     enabled: false
     thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml 
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 52e062629..5eefcd798 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -23,3 +23,4 @@ execute-engines:
       pool.default.thread-count: 10
       pool.snapshots-expiring.thread-count: 10
       pool.orphan-files-cleaning.thread-count: 10
+      pool.clean-dangling-delete-files.thread-count: 10
\ No newline at end of file
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml 
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 5825fde34..ff7719384 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -24,3 +24,5 @@ process-factories:
       expire-snapshots.interval: "1h"
       clean-orphan-files.enabled: "true"
       clean-orphan-files.interval: "1d"
+      clean-dangling-delete-files.enabled: "true"
+      clean-dangling-delete-files.interval: "1d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 060923b3c..216b1a3e4 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -274,6 +274,8 @@ process-factories:
       expire-snapshots.interval: "1h"        # interval for expiring snapshots
       clean-orphan-files.enabled: "true"     # enable orphan files cleaning
       clean-orphan-files.interval: "1d"      # interval for cleaning orphan 
files
+      clean-dangling-delete-files.enabled: "true"     # enable dangling delete 
files cleaning
+      clean-dangling-delete-files.interval: "1d"      # interval for cleaning 
dangling delete files
 ```
 
 {{< hint info >}}
@@ -304,6 +306,7 @@ execute-engines:
       pool.default.thread-count: 10                   # default thread pool 
size
       pool.snapshots-expiring.thread-count: 10        # thread pool for 
snapshot expiration
       pool.orphan-files-cleaning.thread-count: 10     # thread pool for orphan 
file cleaning
+      pool.clean-dangling-delete-files.thread-count: 10     # thread pool for 
dangling delete files cleaning
       process.status.ttl: 4h                          # TTL for process status 
cache
 ```
 
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index aac5c4c42..51428b605 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -49,9 +49,6 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | auto-create-tags.thread-count | 3 | The number of threads used for creating 
tags. |
 | blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if 
not specified. |
 | catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. |
-| clean-dangling-delete-files.enabled | true | Enable dangling delete files 
cleaning. |
-| clean-dangling-delete-files.interval | 1 d | Interval for cleaning dangling 
delete files. |
-| clean-dangling-delete-files.thread-count | 10 | The number of threads used 
for dangling delete files cleaning. |
 | data-expiration.enabled | true | Enable data expiration |
 | data-expiration.interval | 1 d | Execute interval for data expiration |
 | data-expiration.thread-count | 10 | The number of threads used for data 
expiring |


Reply via email to