This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 23fb957da [AMORO-4022] [Improvement]: AMS Iceberg maintainer moved to 
the amoro-iceberg module (#4024)
23fb957da is described below

commit 23fb957da40e3a88b955907adf15bd653fc1148f
Author: ConradJam <[email protected]>
AuthorDate: Wed Jan 7 17:12:17 2026 +0800

    [AMORO-4022] [Improvement]: AMS Iceberg maintainer moved to the 
amoro-iceberg module (#4024)
    
    [AMORO-4022] AMS Iceberg maintainer moved to the amoro-iceberg module 
(#4022)
---
 .../maintainer/DefaultTableMaintainerContext.java  | 109 +++++++++++
 ...aintainers.java => TableMaintainerFactory.java} |  42 ++++-
 .../optimizing/maintainer/TableMaintainers.java    |  30 ++-
 .../DanglingDeleteFilesCleaningExecutor.java       |   2 +-
 .../scheduler/inline/DataExpiringExecutor.java     |   2 +-
 .../inline/OrphanFilesCleaningExecutor.java        |   2 +-
 .../inline/SnapshotsExpiringExecutor.java          |   2 +-
 .../scheduler/inline/TagsAutoCreatingExecutor.java |   2 +-
 .../amoro/server/table/TableOptimizingMetrics.java |   6 +-
 .../table/TableOrphanFilesCleaningMetrics.java     |  14 +-
 .../optimizing/maintainer/TestDataExpire.java      |  30 ++-
 .../optimizing/maintainer/TestOrphanFileClean.java |  41 ++---
 .../maintainer/TestOrphanFileCleanHive.java        |   6 +-
 .../maintainer/TestOrphanFileCleanIceberg.java     |   4 +-
 .../optimizing/maintainer/TestSnapshotExpire.java  | 129 +++++--------
 .../maintainer/TestSnapshotExpireHive.java         |   5 +-
 .../maintainer/TestTableMaintainerContext.java     |  98 ++++++++++
 .../apache/amoro/maintainer/MaintainerMetrics.java |  52 ++++++
 .../apache/amoro/maintainer/OptimizingInfo.java    |  54 ++++++
 .../apache/amoro}/maintainer/TableMaintainer.java  |   3 +-
 .../amoro/maintainer/TableMaintainerContext.java   |  63 +++++++
 .../maintainer/AutoCreateIcebergTagAction.java     |   2 +-
 .../maintainer/IcebergTableMaintainer.java         | 205 ++++++++++++---------
 .../iceberg}/maintainer/MixedTableMaintainer.java  | 152 +++++++++------
 .../formats/iceberg/utils/IcebergTableUtil.java    | 196 ++++++++++++++++++++
 .../formats/iceberg/utils/RollingFileCleaner.java  | 139 ++++++++++++++
 .../maintainer/TestAutoCreateIcebergTagAction.java |  60 +++++-
 27 files changed, 1167 insertions(+), 283 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
new file mode 100644
index 000000000..ca47e2252
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainerContext;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
+import org.apache.amoro.server.utils.HiveLocationUtil;
+import org.apache.amoro.table.MixedTable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Default implementation of TableMaintainerContext for AMS. Adapts 
DefaultTableRuntime to
+ * TableMaintainerContext interface.
+ */
+public class DefaultTableMaintainerContext implements TableMaintainerContext {
+
+  private final DefaultTableRuntime tableRuntime;
+  private final MixedTable mixedTable;
+
+  public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) {
+    this.tableRuntime = tableRuntime;
+    this.mixedTable = null;
+  }
+
+  public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime, 
MixedTable mixedTable) {
+    this.tableRuntime = tableRuntime;
+    this.mixedTable = mixedTable;
+  }
+
+  @Override
+  public TableConfiguration getTableConfiguration() {
+    return tableRuntime.getTableConfiguration();
+  }
+
+  @Override
+  public MaintainerMetrics getMetrics() {
+    TableOrphanFilesCleaningMetrics metrics = 
tableRuntime.getOrphanFilesCleaningMetrics();
+    return new MaintainerMetrics() {
+      @Override
+      public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
+        metrics.completeOrphanDataFiles(expected, cleaned);
+      }
+
+      @Override
+      public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
+        metrics.completeOrphanMetadataFiles(expected, cleaned);
+      }
+    };
+  }
+
+  @Override
+  public OptimizingInfo getOptimizingInfo() {
+    return new DefaultOptimizingInfo(tableRuntime);
+  }
+
+  @Override
+  public Set<String> getHiveLocationPaths() {
+    // Use HiveLocationUtil to get Hive location paths
+    if (mixedTable == null) {
+      return Collections.emptySet();
+    }
+    return HiveLocationUtil.getHiveLocation(mixedTable);
+  }
+
+  /** OptimizingInfo implementation based on DefaultTableRuntime. */
+  private static class DefaultOptimizingInfo implements OptimizingInfo {
+
+    private final DefaultTableRuntime tableRuntime;
+
+    DefaultOptimizingInfo(DefaultTableRuntime tableRuntime) {
+      this.tableRuntime = tableRuntime;
+    }
+
+    @Override
+    public boolean isProcessing() {
+      return tableRuntime.getOptimizingStatus().isProcessing();
+    }
+
+    @Override
+    public long getTargetSnapshotId() {
+      if (!isProcessing()) {
+        return Long.MAX_VALUE;
+      }
+      return tableRuntime.getOptimizingProcess().getTargetSnapshotId();
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
similarity index 53%
copy from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
copy to 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
index 41f49bb23..ec1583fa1 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
@@ -21,25 +21,53 @@ package org.apache.amoro.server.optimizing.maintainer;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.Table;
 
-/** Factory for creating {@link TableMaintainer}. */
-public class TableMaintainers {
+/** Factory for creating {@link TableMaintainer} instances. */
+public class TableMaintainerFactory {
 
-  /** Create a {@link TableMaintainer} for the given table. */
+  /**
+   * Create an Iceberg table maintainer with AMS context.
+   *
+   * @param table the Iceberg table
+   * @param tableRuntime the AMS table runtime
+   * @return IcebergTableMaintainer instance
+   */
+  public static IcebergTableMaintainer createIcebergMaintainer(
+      Table table, DefaultTableRuntime tableRuntime) {
+    return new IcebergTableMaintainer(
+        table,
+        tableRuntime.getTableIdentifier().getIdentifier(),
+        new DefaultTableMaintainerContext(tableRuntime));
+  }
+
+  /**
+   * Create a {@link TableMaintainer} for the given table.
+   *
+   * @param amoroTable the Amoro table
+   * @param tableRuntime the table runtime
+   * @return TableMaintainer instance
+   */
   public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime 
tableRuntime) {
+    Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+    DefaultTableRuntime runtime = (DefaultTableRuntime) tableRuntime;
     TableFormat format = amoroTable.format();
+
     if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
-      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+      MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
       return new MixedTableMaintainer(
-          (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime) 
tableRuntime);
+          mixedTable, new DefaultTableMaintainerContext(runtime, mixedTable));
     } else if (TableFormat.ICEBERG.equals(format)) {
-      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
       return new IcebergTableMaintainer(
-          (Table) amoroTable.originalTable(), amoroTable.id(), 
(DefaultTableRuntime) tableRuntime);
+          (Table) amoroTable.originalTable(),
+          amoroTable.id(),
+          new DefaultTableMaintainerContext(runtime));
     } else {
       throw new RuntimeException("Unsupported table type" + 
amoroTable.originalTable().getClass());
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
index 41f49bb23..eb152bb09 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
@@ -21,25 +21,41 @@ package org.apache.amoro.server.optimizing.maintainer;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.Table;
 
 /** Factory for creating {@link TableMaintainer}. */
+@Deprecated
 public class TableMaintainers {
 
-  /** Create a {@link TableMaintainer} for the given table. */
+  /**
+   * Create a {@link TableMaintainer} for the given table.
+   *
+   * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link
+   *     TableMaintainerFactory#create(AmoroTable, TableRuntime)} instead.
+   */
   public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime 
tableRuntime) {
+    return TableMaintainerFactory.create(amoroTable, tableRuntime);
+  }
+
+  /**
+   * Create a {@link TableMaintainer} for the given table with 
DefaultTableRuntime.
+   *
+   * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link
+   *     TableMaintainerFactory#createIcebergMaintainer(Table, 
DefaultTableRuntime)} instead.
+   */
+  public static TableMaintainer create(AmoroTable<?> amoroTable, 
DefaultTableRuntime tableRuntime) {
     TableFormat format = amoroTable.format();
     if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
-      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+      MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
       return new MixedTableMaintainer(
-          (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime) 
tableRuntime);
+          mixedTable, new DefaultTableMaintainerContext(tableRuntime, 
mixedTable));
     } else if (TableFormat.ICEBERG.equals(format)) {
-      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
-      return new IcebergTableMaintainer(
-          (Table) amoroTable.originalTable(), amoroTable.id(), 
(DefaultTableRuntime) tableRuntime);
+      return TableMaintainerFactory.createIcebergMaintainer(
+          (Table) amoroTable.originalTable(), tableRuntime);
     } else {
       throw new RuntimeException("Unsupported table type" + 
amoroTable.originalTable().getClass());
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index d7d8801f1..337481439 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.DefaultTableRuntime;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 61f45860b..05d72761f 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index 21d60cd10..19db0f192 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index 15f2d49d9..d137b98a7 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
index 5207f105a..b70015e8e 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
@@ -22,7 +22,7 @@ import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
index ef93356d5..a6b2a65df 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
@@ -31,7 +31,6 @@ import org.apache.amoro.metrics.MetricRegistry;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
-import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
 import org.apache.amoro.shade.guava32.com.google.common.primitives.Longs;
 import org.apache.iceberg.Snapshot;
@@ -350,7 +349,10 @@ public class TableOptimizingMetrics extends 
AbstractTableMetrics {
     }
     // ignore snapshot which is created by amoro maintain commits or no files 
added
     if (snapshot.summary().values().stream()
-            .anyMatch(IcebergTableMaintainer.AMORO_MAINTAIN_COMMITS::contains)
+            .anyMatch(
+                
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer
+                        .AMORO_MAINTAIN_COMMITS
+                    ::contains)
         || 
Long.parseLong(snapshot.summary().getOrDefault(SnapshotSummary.ADDED_FILES_PROP,
 "0"))
             == 0) {
       return;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
index 9692b01e7..481eb175b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
@@ -21,12 +21,14 @@ package org.apache.amoro.server.table;
 import static org.apache.amoro.metrics.MetricDefine.defineCounter;
 
 import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.maintainer.MaintainerMetrics;
 import org.apache.amoro.metrics.Counter;
 import org.apache.amoro.metrics.MetricDefine;
 import org.apache.amoro.metrics.MetricRegistry;
 
 /** Table Orphan Files Cleaning metrics. */
-public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics {
+public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics
+    implements MaintainerMetrics {
   private final Counter orphanDataFilesCount = new Counter();
   private final Counter expectedOrphanDataFilesCount = new Counter();
 
@@ -89,4 +91,14 @@ public class TableOrphanFilesCleaningMetrics extends 
AbstractTableMetrics {
     expectedOrphanMetadataFilesCount.inc(expected);
     orphanMetadataFilesCount.inc(cleaned);
   }
+
+  @Override
+  public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
+    completeOrphanDataFiles(expected, cleaned);
+  }
+
+  @Override
+  public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
+    completeOrphanMetadataFiles(expected, cleaned);
+  }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 75b3e9a60..737bd3878 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -31,6 +31,8 @@ import org.apache.amoro.config.DataExpirationConfig;
 import org.apache.amoro.data.ChangeAction;
 import org.apache.amoro.data.DataFileType;
 import org.apache.amoro.data.PrimaryKeyedFile;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
 import org.apache.amoro.io.MixedDataTestHelpers;
 import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
 import org.apache.amoro.optimizing.scan.TableFileScanHelper;
@@ -291,7 +293,8 @@ public class TestDataExpire extends ExecutorTestBase {
 
     // expire partitions that order than 2022-01-02 18:00:00.000
     DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(keyedTable, null);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(keyedTable, 
TestTableMaintainerContext.of(keyedTable));
     tableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -380,7 +383,8 @@ public class TestDataExpire extends ExecutorTestBase {
 
     // expire partitions that order than 2022-01-02 18:00:00.000
     DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer mixedTableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     mixedTableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -454,14 +458,18 @@ public class TestDataExpire extends ExecutorTestBase {
     if (getTestFormat().equals(TableFormat.ICEBERG)) {
       Table table = getMixedTable().asUnkeyedTable();
       IcebergTableMaintainer icebergTableMaintainer =
-          new IcebergTableMaintainer(table, getMixedTable().id(), null);
+          new IcebergTableMaintainer(
+              table,
+              getMixedTable().id(),
+              TestTableMaintainerContext.of(getMixedTable().asUnkeyedTable()));
       Types.NestedField field = 
table.schema().findField(config.getExpirationField());
       long lastSnapshotTime = table.currentSnapshot().timestampMillis();
       long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config, 
field).toEpochMilli();
       Assert.assertEquals(lastSnapshotTime, lastCommitTime);
     } else {
       MixedTable mixedTable = getMixedTable();
-      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(mixedTable, null);
+      MixedTableMaintainer mixedTableMaintainer =
+          new MixedTableMaintainer(mixedTable, 
TestTableMaintainerContext.of(mixedTable));
       Types.NestedField field = 
getMixedTable().schema().findField(config.getExpirationField());
 
       long lastSnapshotTime;
@@ -488,7 +496,10 @@ public class TestDataExpire extends ExecutorTestBase {
     if (getTestFormat().equals(TableFormat.ICEBERG)) {
       Table table = getMixedTable().asUnkeyedTable();
       IcebergTableMaintainer icebergTableMaintainer =
-          new IcebergTableMaintainer(table, getMixedTable().id(), null);
+          new IcebergTableMaintainer(
+              table,
+              getMixedTable().id(),
+              TestTableMaintainerContext.of(getMixedTable().asUnkeyedTable()));
       Types.NestedField field = 
table.schema().findField(config.getExpirationField());
       icebergTableMaintainer.expireDataFrom(
           config,
@@ -500,7 +511,8 @@ public class TestDataExpire extends ExecutorTestBase {
                           
getMixedTable().schema().findField(config.getExpirationField())))
                   .toInstant());
     } else {
-      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+      MixedTableMaintainer mixedTableMaintainer =
+          new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
       Types.NestedField field = 
getMixedTable().schema().findField(config.getExpirationField());
       mixedTableMaintainer.expireDataFrom(
           config,
@@ -562,7 +574,8 @@ public class TestDataExpire extends ExecutorTestBase {
     assertScanResult(scan, 1, 0);
 
     DataExpirationConfig config = parseDataExpirationConfig(testTable);
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer mixedTableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     mixedTableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2024-01-01T00:00:00.000")
@@ -829,7 +842,8 @@ public class TestDataExpire extends ExecutorTestBase {
             .writeChangeStore(keyedTable, 1L, ChangeAction.INSERT, 
changeRecords, false));
 
     DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(keyedTable, null);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(keyedTable, 
TestTableMaintainerContext.of(keyedTable));
     tableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2022-01-04T12:00:00.000")
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
index da78da32e..1326bacdd 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
@@ -18,8 +18,8 @@
 
 package org.apache.amoro.server.optimizing.maintainer;
 
-import static 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
-import static 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_JOB_ID;
+import static 
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
+import static 
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_JOB_ID;
 
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.ServerTableIdentifier;
@@ -27,9 +27,8 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
 import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
 import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
 import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.table.TableProperties;
@@ -47,7 +46,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -119,7 +117,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
                 tableIdentifier.getDatabase(),
                 tableIdentifier.getTableName(),
                 getTestFormat()));
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer maintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     maintainer.cleanContentFiles(
         System.currentTimeMillis()
             - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 
1000,
@@ -203,7 +202,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
       
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
     }
 
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer maintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
@@ -296,7 +296,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
       
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
     }
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
@@ -345,9 +346,9 @@ public class TestOrphanFileClean extends ExecutorTestBase {
     Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
-    new MixedTableMaintainer(getMixedTable(), null)
+    new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()))
         .cleanContentFiles(System.currentTimeMillis() + 1, 
orphanFilesCleaningMetrics);
-    new MixedTableMaintainer(getMixedTable(), null)
+    new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()))
         .cleanMetadata(System.currentTimeMillis() + 1, 
orphanFilesCleaningMetrics);
     Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
@@ -395,28 +396,16 @@ public class TestOrphanFileClean extends ExecutorTestBase 
{
       Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath));
     }
 
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(baseTable.id(), getTestFormat()));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(baseTable.properties()));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(baseTable.properties()));
-
-    Mockito.when(tableRuntime.getOrphanFilesCleaningMetrics())
-        .thenReturn(
-            new TableOrphanFilesCleaningMetrics(
-                ServerTableIdentifier.of(baseTable.id(), getTestFormat())));
-
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    MixedTableMaintainer maintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     maintainer.cleanOrphanFiles();
 
     Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir));
     Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath));
 
     baseTable.updateProperties().set("gc.enabled", "true").commit();
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig((baseTable.properties())));
+    maintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     maintainer.cleanOrphanFiles();
 
     Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir));
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
index e55f33ee8..729999525 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
@@ -18,12 +18,13 @@
 
 package org.apache.amoro.server.optimizing.maintainer;
 
-import static 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
+import static 
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
 
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
 import org.apache.amoro.hive.TestHMS;
 import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
 import org.apache.amoro.hive.catalog.HiveTableTestHelper;
@@ -84,7 +85,8 @@ public class TestOrphanFileCleanHive extends 
TestOrphanFileClean {
     changeOrphanDataFile.createOrOverwrite().close();
     Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath));
 
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    MixedTableMaintainer maintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()));
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
index 60bbd4853..c882187af 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
@@ -23,6 +23,7 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
 import org.apache.amoro.hive.io.writer.AdaptHiveGenericTaskWriterBuilder;
 import org.apache.amoro.io.writer.SortedPosDeleteWriter;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -101,7 +102,8 @@ public class TestOrphanFileCleanIceberg extends 
TestOrphanFileClean {
     assertDanglingDeleteFiles(testTable, 1);
 
     IcebergTableMaintainer tableMaintainer =
-        new IcebergTableMaintainer(testTable, testTable.id(), null);
+        new IcebergTableMaintainer(
+            testTable, testTable.id(), 
TestTableMaintainerContext.of(testTable));
     tableMaintainer.doCleanDanglingDeleteFiles();
 
     assertDanglingDeleteFiles(testTable, 0);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index 9f04c4ee2..1a391a404 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -18,20 +18,18 @@
 
 package org.apache.amoro.server.optimizing.maintainer;
 
-import static 
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID;
+import static 
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID;
 import static 
org.apache.amoro.utils.MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
 
 import org.apache.amoro.BasicTableTestHelper;
-import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.data.ChangeAction;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
-import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.OptimizingInfo;
 import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableConfigurations;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterators;
@@ -55,7 +53,6 @@ import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -115,7 +112,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
         file ->
             
Assert.assertTrue(testKeyedTable.changeTable().io().exists(file.path().toString())));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, null);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable));
     tableMaintainer.getChangeMaintainer().expireFiles(l + 1);
 
     // In order to advance the snapshot
@@ -166,18 +164,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Snapshot lastSnapshot = testKeyedTable.changeTable().currentSnapshot();
 
     testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, 
"0").commit();
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(testKeyedTable.id(), 
getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig((testKeyedTable.properties())));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig((testKeyedTable.properties())));
 
     Assert.assertEquals(5, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, tableRuntime);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable));
     tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
@@ -210,16 +201,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Snapshot lastSnapshot = table.currentSnapshot();
 
     table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, 
"0").commit();
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
 
     Assert.assertEquals(4, Iterables.size(table.snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(table));
     tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
@@ -249,18 +235,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Snapshot lastSnapshot = table.currentSnapshot();
 
     table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, 
"0").commit();
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
 
     Assert.assertEquals(4, Iterables.size(table.snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(table));
     tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
@@ -281,24 +260,29 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.newAppend().commit();
     table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, 
"0").commit();
 
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
-
-    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+    TestTableMaintainerContext.Impl context =
+        new TestTableMaintainerContext.Impl(
+            TableConfigurations.parseTableConfig(table.properties()), table);
+    new MixedTableMaintainer(table, context).expireSnapshots();
     Assert.assertEquals(1, Iterables.size(table.snapshots()));
 
     table.newAppend().commit();
 
-    // mock tableRuntime which has optimizing task not committed
+    // mock optimizing task not committed
     long optimizeSnapshotId = table.currentSnapshot().snapshotId();
-    OptimizingProcess optimizingProcess = 
Mockito.mock(OptimizingProcess.class);
-    
Mockito.when(optimizingProcess.getTargetSnapshotId()).thenReturn(optimizeSnapshotId);
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.COMMITTING);
-    
Mockito.when(tableRuntime.getOptimizingProcess()).thenReturn(optimizingProcess);
+    OptimizingInfo optimizingInfo =
+        new OptimizingInfo() {
+          @Override
+          public boolean isProcessing() {
+            return true;
+          }
+
+          @Override
+          public long getTargetSnapshotId() {
+            return optimizeSnapshotId;
+          }
+        };
+    context.setOptimizingInfo(optimizingInfo);
     List<Snapshot> expectedSnapshots = new ArrayList<>();
     expectedSnapshots.add(table.currentSnapshot());
 
@@ -307,7 +291,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.newAppend().commit();
     expectedSnapshots.add(table.currentSnapshot());
 
-    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+    new MixedTableMaintainer(table, context).expireSnapshots();
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
     Assert.assertTrue(
         Iterators.elementsEqual(expectedSnapshots.iterator(), 
table.snapshots().iterator()));
@@ -331,7 +315,9 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     List<DataFile> newDataFiles = writeAndCommitBaseStore(table);
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
-    new MixedTableMaintainer(table, 
null).expireSnapshots(System.currentTimeMillis(), 1);
+    new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(getMixedTable()))
+        .getBaseMaintainer()
+        .expireSnapshots(System.currentTimeMillis(), 1);
     Assert.assertEquals(1, Iterables.size(table.snapshots()));
 
     dataFiles.forEach(file -> 
Assert.assertFalse(table.io().exists(file.path().toString())));
@@ -379,7 +365,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Assert.assertEquals(12, 
Iterables.size(testKeyedTable.changeTable().newScan().planFiles()));
     Assert.assertEquals(3, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, null);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable));
     tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1);
     tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 
1, 1);
 
@@ -447,7 +434,9 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Assert.assertTrue(baseTable.io().exists(file1.path()));
     Assert.assertTrue(baseTable.io().exists(file2.path()));
     Assert.assertTrue(baseTable.io().exists(file3.path()));
-    new MixedTableMaintainer(testKeyedTable, null).expireSnapshots(expireTime, 
1);
+    new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable))
+        .getBaseMaintainer()
+        .expireSnapshots(expireTime, 1);
 
     Assert.assertEquals(1, Iterables.size(baseTable.snapshots()));
     Assert.assertFalse(baseTable.io().exists(file1.path()));
@@ -467,24 +456,15 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     Assert.assertEquals(2, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(testKeyedTable.id(), 
getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
-
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, tableRuntime);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable));
     testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, 
"0").commit();
     tableMaintainer.expireSnapshots();
     Assert.assertEquals(2, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
     testKeyedTable.updateProperties().set("gc.enabled", "true").commit();
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
-    tableMaintainer = new MixedTableMaintainer(testKeyedTable, tableRuntime);
+    tableMaintainer =
+        new MixedTableMaintainer(testKeyedTable, 
TestTableMaintainerContext.of(testKeyedTable));
     tableMaintainer.expireSnapshots();
     Assert.assertEquals(1, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
   }
@@ -500,22 +480,16 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
 
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(testUnkeyedTable.id(), 
getTestFormat()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
+    MixedTableMaintainer tableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(testUnkeyedTable));
     
testUnkeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, 
"0").commit();
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testUnkeyedTable, tableRuntime);
     tableMaintainer.expireSnapshots();
     Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
 
     testUnkeyedTable.updateProperties().set("gc.enabled", "true").commit();
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
-    tableMaintainer = new MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+    tableMaintainer =
+        new MixedTableMaintainer(getMixedTable(), 
TestTableMaintainerContext.of(testUnkeyedTable));
     tableMaintainer.expireSnapshots();
     Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots()));
   }
@@ -534,14 +508,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, 
"0s").commit();
     table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, 
"3").commit();
 
-    DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    Mockito.when(tableRuntime.getTableIdentifier())
-        .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-    Mockito.when(tableRuntime.getTableConfiguration())
-        .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
-    
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-
-    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+    new MixedTableMaintainer(table, 
TestTableMaintainerContext.of(table)).expireSnapshots();
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
 
     table.newAppend().commit();
@@ -549,7 +516,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.newAppend().commit();
     expectedSnapshots.add(table.currentSnapshot());
 
-    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+    new MixedTableMaintainer(table, 
TestTableMaintainerContext.of(table)).expireSnapshots();
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
     Assert.assertTrue(
         Iterators.elementsEqual(expectedSnapshots.iterator(), 
table.snapshots().iterator()));
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
index 16539fa01..b1628808c 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
@@ -21,11 +21,13 @@ package org.apache.amoro.server.optimizing.maintainer;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
 import org.apache.amoro.hive.TestHMS;
 import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
 import org.apache.amoro.hive.catalog.HiveTableTestHelper;
 import org.apache.amoro.hive.io.HiveDataTestHelpers;
 import org.apache.amoro.hive.utils.HiveTableUtil;
+import org.apache.amoro.maintainer.TableMaintainerContext;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.MixedTable;
@@ -111,7 +113,8 @@ public class TestSnapshotExpireHive extends 
TestSnapshotExpire {
         isKeyedTable()
             ? getMixedTable().asKeyedTable().baseTable()
             : getMixedTable().asUnkeyedTable();
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
+    TableMaintainerContext context = 
TestTableMaintainerContext.of(getMixedTable());
+    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), context);
     
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(),
 1);
     Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
new file mode 100644
index 000000000..ea2ab96d2
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainerContext;
+import org.apache.amoro.server.table.TableConfigurations;
+import org.apache.amoro.server.utils.HiveLocationUtil;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Utility class for creating test TableMaintainerContext instances. */
+public class TestTableMaintainerContext {
+
+  /** Create a test TableMaintainerContext for the given MixedTable. */
+  public static TableMaintainerContext of(MixedTable table) {
+    return new Impl(TableConfigurations.parseTableConfig(table.properties()), 
table);
+  }
+
+  /** Create a test TableMaintainerContext for the given KeyedTable. */
+  public static TableMaintainerContext of(KeyedTable table) {
+    return new Impl(TableConfigurations.parseTableConfig(table.properties()));
+  }
+
+  /** Create a test TableMaintainerContext for the given UnkeyedTable. */
+  public static TableMaintainerContext of(UnkeyedTable table) {
+    return new Impl(TableConfigurations.parseTableConfig(table.properties()));
+  }
+
+  /** Test implementation of TableMaintainerContext. */
+  public static class Impl implements TableMaintainerContext {
+    private final TableConfiguration tableConfiguration;
+    private OptimizingInfo optimizingInfo;
+    private final MixedTable mixedTable;
+
+    public Impl(TableConfiguration tableConfiguration) {
+      this.tableConfiguration = tableConfiguration;
+      this.optimizingInfo = OptimizingInfo.EMPTY;
+      this.mixedTable = null;
+    }
+
+    public Impl(TableConfiguration tableConfiguration, MixedTable mixedTable) {
+      this.tableConfiguration = tableConfiguration;
+      this.optimizingInfo = OptimizingInfo.EMPTY;
+      this.mixedTable = mixedTable;
+    }
+
+    public void setOptimizingInfo(OptimizingInfo optimizingInfo) {
+      this.optimizingInfo = optimizingInfo;
+    }
+
+    @Override
+    public TableConfiguration getTableConfiguration() {
+      return tableConfiguration;
+    }
+
+    @Override
+    public MaintainerMetrics getMetrics() {
+      return MaintainerMetrics.NOOP;
+    }
+
+    @Override
+    public OptimizingInfo getOptimizingInfo() {
+      return optimizingInfo;
+    }
+
+    @Override
+    public Set<String> getHiveLocationPaths() {
+      // Use HiveLocationUtil to get Hive location paths
+      if (mixedTable == null) {
+        return Collections.emptySet();
+      }
+      return HiveLocationUtil.getHiveLocation(mixedTable);
+    }
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java
new file mode 100644
index 000000000..420c61e3b
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+/**
+ * Metrics collector interface for table maintenance operations. 
Implementations can collect metrics
+ * to different monitoring systems.
+ */
+public interface MaintainerMetrics {
+
+  /**
+   * Record orphan data files cleaning result.
+   *
+   * @param expected expected number of files to clean
+   * @param cleaned actual number of files cleaned
+   */
+  void recordOrphanDataFilesCleaned(int expected, int cleaned);
+
+  /**
+   * Record orphan metadata files cleaning result.
+   *
+   * @param expected expected number of files to clean
+   * @param cleaned actual number of files cleaned
+   */
+  void recordOrphanMetadataFilesCleaned(int expected, int cleaned);
+
+  /** No-op implementation that does nothing. */
+  MaintainerMetrics NOOP =
+      new MaintainerMetrics() {
+        @Override
+        public void recordOrphanDataFilesCleaned(int expected, int cleaned) {}
+
+        @Override
+        public void recordOrphanMetadataFilesCleaned(int expected, int 
cleaned) {}
+      };
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java
new file mode 100644
index 000000000..3056421ba
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+/**
+ * Optimizing process information interface. Provides information about 
ongoing optimizing
+ * processes.
+ */
+public interface OptimizingInfo {
+
+  /**
+   * Check if there is an optimizing process currently processing.
+   *
+   * @return true if an optimizing process is in progress
+   */
+  boolean isProcessing();
+
+  /**
+   * Get the target snapshot id that the optimizing process is based on.
+   *
+   * @return target snapshot id, or {@code Long.MAX_VALUE} if no process is 
running
+   */
+  long getTargetSnapshotId();
+
+  /** Empty implementation indicating no optimizing process. */
+  OptimizingInfo EMPTY =
+      new OptimizingInfo() {
+        @Override
+        public boolean isProcessing() {
+          return false;
+        }
+
+        @Override
+        public long getTargetSnapshotId() {
+          return Long.MAX_VALUE;
+        }
+      };
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
 b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
similarity index 92%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
rename to 
amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
index 9474102b2..6719e1507 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.maintainer;
 
 /**
  * API for maintaining table.
@@ -24,7 +24,6 @@ package org.apache.amoro.server.optimizing.maintainer;
  * <p>Includes: clean content files, clean metadata, clean dangling delete 
files, expire snapshots,
  * auto create tags.
  */
-// TODO TableMaintainer should not be in this optimizing.xxx package.
 public interface TableMaintainer {
 
   /** Clean table orphan files. Includes: data files, metadata files. */
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
new file mode 100644
index 000000000..4ebdf7360
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+
+import java.util.Set;
+
+/**
+ * Context interface for table maintainer operations.
+ *
+ * <p>Provides necessary configuration and runtime information for table 
maintenance without
+ * depending on AMS-specific implementations.
+ */
+public interface TableMaintainerContext {
+
+  /**
+   * Get the table configuration.
+   *
+   * @return table configuration containing all maintenance-related settings
+   */
+  TableConfiguration getTableConfiguration();
+
+  /**
+   * Get the metrics collector for maintenance operations.
+   *
+   * @return metrics collector, may return NoopMaintainerMetrics if metrics 
not supported
+   */
+  MaintainerMetrics getMetrics();
+
+  /**
+   * Get optimizing process information if available.
+   *
+   * @return optimizing information, may return EmptyOptimizingInfo if no 
optimizing process
+   */
+  OptimizingInfo getOptimizingInfo();
+
+  /**
+   * Get Hive table/partition location paths if the table is a Hive-backed 
table.
+   *
+   * <p>This is used to exclude Hive-managed files from being cleaned during 
maintenance operations.
+   * For non-Hive tables, returns an empty set.
+   *
+   * @return set of Hive location paths, or empty set if not a Hive table
+   */
+  Set<String> getHiveLocationPaths();
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
similarity index 98%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
rename to 
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
index 51ef16702..0253b8791 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
 
 import org.apache.amoro.config.TagConfiguration;
 import org.apache.iceberg.ManageSnapshots;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
similarity index 88%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
rename to 
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index 83382e672..3d1d4bfb1 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
 
 import static 
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.min;
 
@@ -24,15 +24,16 @@ import org.apache.amoro.api.CommitMetaProducer;
 import org.apache.amoro.config.DataExpirationConfig;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.config.TagConfiguration;
+import org.apache.amoro.formats.iceberg.utils.IcebergTableUtil;
+import org.apache.amoro.formats.iceberg.utils.RollingFileCleaner;
 import org.apache.amoro.iceberg.Constants;
 import org.apache.amoro.io.AuthenticatedFileIO;
 import org.apache.amoro.io.PathInfo;
 import org.apache.amoro.io.SupportsFileSystemOperations;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
-import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
-import org.apache.amoro.server.utils.IcebergTableUtil;
-import org.apache.amoro.server.utils.RollingFileCleaner;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainerContext;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -96,7 +97,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-/** Table maintainer for iceberg tables. */
+/** Table maintainer for Iceberg tables. */
 public class IcebergTableMaintainer implements TableMaintainer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableMaintainer.class);
@@ -118,22 +119,21 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
           CommitMetaProducer.DATA_EXPIRATION.name(),
           CommitMetaProducer.CLEAN_DANGLING_DELETE.name());
 
-  protected Table table;
+  public Table table;
   private final TableIdentifier tableIdentifier;
-  private final DefaultTableRuntime tableRuntime;
+  private final TableMaintainerContext context;
 
   public IcebergTableMaintainer(
-      Table table, TableIdentifier tableIdentifier, DefaultTableRuntime 
tableRuntime) {
+      Table table, TableIdentifier tableIdentifier, TableMaintainerContext 
context) {
     this.table = table;
     this.tableIdentifier = tableIdentifier;
-    this.tableRuntime = tableRuntime;
+    this.context = context;
   }
 
   @Override
   public void cleanOrphanFiles() {
-    TableConfiguration tableConfiguration = 
tableRuntime.getTableConfiguration();
-    TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
-        tableRuntime.getOrphanFilesCleaningMetrics();
+    TableConfiguration tableConfiguration = context.getTableConfiguration();
+    MaintainerMetrics metrics = context.getMetrics();
 
     if (!tableConfiguration.isCleanOrphanEnabled()) {
       return;
@@ -141,18 +141,18 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
 
     long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000;
 
-    cleanContentFiles(System.currentTimeMillis() - keepTime, 
orphanFilesCleaningMetrics);
+    cleanContentFiles(System.currentTimeMillis() - keepTime, metrics);
 
     // refresh
     table.refresh();
 
     // clear metadata files
-    cleanMetadata(System.currentTimeMillis() - keepTime, 
orphanFilesCleaningMetrics);
+    cleanMetadata(System.currentTimeMillis() - keepTime, metrics);
   }
 
   @Override
   public void cleanDanglingDeleteFiles() {
-    TableConfiguration tableConfiguration = 
tableRuntime.getTableConfiguration();
+    TableConfiguration tableConfiguration = context.getTableConfiguration();
     if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
       return;
     }
@@ -175,21 +175,21 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
 
   @Override
   public void expireSnapshots() {
-    if (!expireSnapshotEnabled(tableRuntime)) {
+    if (!expireSnapshotEnabled()) {
       return;
     }
     expireSnapshots(
-        mustOlderThan(tableRuntime, System.currentTimeMillis()),
-        tableRuntime.getTableConfiguration().getSnapshotMinCount());
+        mustOlderThan(System.currentTimeMillis()),
+        context.getTableConfiguration().getSnapshotMinCount());
   }
 
-  protected boolean expireSnapshotEnabled(DefaultTableRuntime tableRuntime) {
-    TableConfiguration tableConfiguration = 
tableRuntime.getTableConfiguration();
+  public boolean expireSnapshotEnabled() {
+    TableConfiguration tableConfiguration = context.getTableConfiguration();
     return tableConfiguration.isExpireSnapshotEnabled();
   }
 
   @VisibleForTesting
-  void expireSnapshots(long mustOlderThan, int minCount) {
+  public void expireSnapshots(long mustOlderThan, int minCount) {
     expireSnapshots(mustOlderThan, minCount, 
expireSnapshotNeedToExcludeFiles());
   }
 
@@ -229,11 +229,10 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
 
   @Override
   public void expireData() {
-    DataExpirationConfig expirationConfig =
-        tableRuntime.getTableConfiguration().getExpiringDataConfig();
+    DataExpirationConfig expirationConfig = 
context.getTableConfiguration().getExpiringDataConfig();
     try {
       Types.NestedField field = 
table.schema().findField(expirationConfig.getExpirationField());
-      if (!TableConfigurations.isValidDataExpirationField(expirationConfig, 
field, table.name())) {
+      if (!isValidDataExpirationField(expirationConfig, field, table.name())) {
         return;
       }
 
@@ -243,8 +242,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
-  protected Instant expireBaseOnRule(
-      DataExpirationConfig expirationConfig, Types.NestedField field) {
+  public Instant expireBaseOnRule(DataExpirationConfig expirationConfig, 
Types.NestedField field) {
     switch (expirationConfig.getBaseOnRule()) {
       case CURRENT_TIME:
         return Instant.now();
@@ -293,12 +291,11 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
 
   @Override
   public void autoCreateTags() {
-    TagConfiguration tagConfiguration = 
tableRuntime.getTableConfiguration().getTagConfiguration();
+    TagConfiguration tagConfiguration = 
context.getTableConfiguration().getTagConfiguration();
     new AutoCreateIcebergTagAction(table, tagConfiguration, 
LocalDateTime.now()).execute();
   }
 
-  protected void cleanContentFiles(
-      long lastTime, TableOrphanFilesCleaningMetrics 
orphanFilesCleaningMetrics) {
+  public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) {
     // For clean data files, should getRuntime valid files in the base store 
and the change store,
     // so acquire in advance
     // to prevent repeated acquisition
@@ -307,19 +304,18 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         "Starting cleaning orphan content files for table {} before {}",
         table.name(),
         Instant.ofEpochMilli(lastTime));
-    clearInternalTableContentsFiles(lastTime, validFiles, 
orphanFilesCleaningMetrics);
+    clearInternalTableContentsFiles(lastTime, validFiles, metrics);
   }
 
-  protected void cleanMetadata(
-      long lastTime, TableOrphanFilesCleaningMetrics 
orphanFilesCleaningMetrics) {
+  public void cleanMetadata(long lastTime, MaintainerMetrics metrics) {
     LOG.info(
         "Starting cleaning metadata files for table {} before {}",
         table.name(),
         Instant.ofEpochMilli(lastTime));
-    clearInternalTableMetadata(lastTime, orphanFilesCleaningMetrics);
+    clearInternalTableMetadata(lastTime, metrics);
   }
 
-  protected void doCleanDanglingDeleteFiles() {
+  public void doCleanDanglingDeleteFiles() {
     LOG.info("Starting cleaning dangling delete files for table {}", 
table.name());
     int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles();
     runWithCondition(
@@ -331,18 +327,18 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
                 table.name()));
   }
 
-  protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+  public long mustOlderThan(long now) {
     long mustOlderThan =
         min(
             // The snapshots keep time
-            now - snapshotsKeepTime(tableRuntime),
+            now - snapshotsKeepTime(),
             // The snapshot optimizing plan based should not be expired for 
committing
-            fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+            fetchOptimizingPlanSnapshotTime(table),
             // The latest non-optimized snapshot should not be expired for 
data expiring
             fetchLatestNonOptimizedSnapshotTime(table));
 
     long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
-    long flinkCkRetainMillis = 
tableRuntime.getTableConfiguration().getFlinkCheckpointRetention();
+    long flinkCkRetainMillis = 
context.getTableConfiguration().getFlinkCheckpointRetention();
     if ((now - latestFlinkCommitTime) > flinkCkRetainMillis) {
       // exceed configured flink checkpoint retain time, no need to consider 
flink committed
       // snapshot
@@ -353,15 +349,15 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
-  protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
-    return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 
1000;
+  public long snapshotsKeepTime() {
+    return context.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000;
   }
 
   protected Set<String> expireSnapshotNeedToExcludeFiles() {
     return Collections.emptySet();
   }
 
-  protected Set<String> orphanFileCleanNeedToExcludeFiles() {
+  public Set<String> orphanFileCleanNeedToExcludeFiles() {
     return Sets.union(
         IcebergTableUtil.getAllContentFilePath(table),
         IcebergTableUtil.getAllStatisticsFilePath(table));
@@ -372,9 +368,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   private void clearInternalTableContentsFiles(
-      long lastTime,
-      Set<String> exclude,
-      TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics) {
+      long lastTime, Set<String> exclude, MaintainerMetrics metrics) {
     String dataLocation = table.location() + File.separator + DATA_FOLDER_NAME;
     int expected = 0, deleted = 0;
 
@@ -412,12 +406,11 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
               finalDeleted,
               finalExpected,
               table.name());
-          orphanFilesCleaningMetrics.completeOrphanDataFiles(finalExpected, 
finalDeleted);
+          metrics.recordOrphanDataFilesCleaned(finalExpected, finalDeleted);
         });
   }
 
-  private void clearInternalTableMetadata(
-      long lastTime, TableOrphanFilesCleaningMetrics 
orphanFilesCleaningMetrics) {
+  private void clearInternalTableMetadata(long lastTime, MaintainerMetrics 
metrics) {
     Set<String> validFiles = getValidMetadataFiles(table);
     LOG.info("Found {} valid metadata files for table {}", validFiles.size(), 
table.name());
     Pattern excludeFileNameRegex = getExcludeFileNameRegex(table);
@@ -444,7 +437,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
                 deleted,
                 filesToDelete.size(),
                 table.name());
-            
orphanFilesCleaningMetrics.completeOrphanMetadataFiles(filesToDelete.size(), 
deleted);
+            metrics.recordOrphanMetadataFilesCleaned(filesToDelete.size(), 
deleted);
           });
     } else {
       LOG.warn(
@@ -496,14 +489,13 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
    * process planned based. This snapshot will be used when optimizing process 
committing.
    *
    * @param table table
-   * @param tableRuntime table runtime
    * @return time of snapshot for optimizing process planned based, return 
Long.MAX_VALUE if no
    *     optimizing process exists
    */
-  public static long fetchOptimizingPlanSnapshotTime(
-      Table table, DefaultTableRuntime tableRuntime) {
-    if (tableRuntime.getOptimizingStatus().isProcessing()) {
-      long fromSnapshotId = 
tableRuntime.getOptimizingProcess().getTargetSnapshotId();
+  public long fetchOptimizingPlanSnapshotTime(Table table) {
+    OptimizingInfo optimizingInfo = context.getOptimizingInfo();
+    if (optimizingInfo.isProcessing()) {
+      long fromSnapshotId = optimizingInfo.getTargetSnapshotId();
 
       for (Snapshot snapshot : table.snapshots()) {
         if (snapshot.snapshotId() == fromSnapshotId) {
@@ -656,7 +648,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return filesToDelete;
   }
 
-  CloseableIterable<FileEntry> fileScan(
+  public CloseableIterable<FileEntry> fileScan(
       Table table,
       Expression dataFilter,
       DataExpirationConfig expirationConfig,
@@ -706,7 +698,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         });
   }
 
-  protected ExpireFiles expiredFileScan(
+  public ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
     Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
@@ -789,6 +781,52 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
+  /**
+   * Check if the given field is valid for data expiration.
+   *
+   * @param config data expiration config
+   * @param field table nested field
+   * @param name table name
+   * @return true if field is valid
+   */
+  protected boolean isValidDataExpirationField(
+      DataExpirationConfig config, Types.NestedField field, String name) {
+    return config.isEnabled()
+        && config.getRetentionTime() > 0
+        && validateExpirationField(field, name, config.getExpirationField());
+  }
+
+  public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
+      Sets.newHashSet(
+          Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG, 
Type.TypeID.DATE);
+
+  private static boolean validateExpirationField(
+      Types.NestedField field, String name, String expirationField) {
+    if (Strings.isNullOrEmpty(expirationField) || null == field) {
+      LOG.warn(
+          String.format(
+              "Field(%s) used to determine data expiration is illegal for 
table(%s)",
+              expirationField, name));
+      return false;
+    }
+    Type.TypeID typeID = field.type().typeId();
+    if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
+      LOG.warn(
+          String.format(
+              "Table(%s) field(%s) type(%s) is not supported for data 
expiration, please use the "
+                  + "following types: %s",
+              name,
+              expirationField,
+              typeID.name(),
+              DATA_EXPIRATION_FIELD_TYPES.stream()
+                  .map(Enum::name)
+                  .collect(Collectors.joining(", "))));
+      return false;
+    }
+
+    return true;
+  }
+
   /**
    * Create a filter expression for expired files for the `FILE` level. For 
the `PARTITION` level,
    * we need to collect the oldest files to determine if the partition is 
obsolete, so we will not
@@ -797,7 +835,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
    * @param expirationConfig expiration configuration
    * @param expireTimestamp expired timestamp
    */
-  protected static Expression getDataExpression(
+  public static Expression getDataExpression(
       Schema schema, DataExpirationConfig expirationConfig, long 
expireTimestamp) {
     if 
(expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION))
 {
       return Expressions.alwaysTrue();
@@ -831,7 +869,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
-  void expireFiles(ExpireFiles expiredFiles, long expireTimestamp) {
+  public void expireFiles(ExpireFiles expiredFiles, long expireTimestamp) {
     long snapshotId = IcebergTableUtil.getSnapshotId(table, false);
     Queue<DataFile> dataFiles = expiredFiles.dataFiles;
     Queue<DeleteFile> deleteFiles = expiredFiles.deleteFiles;
@@ -871,16 +909,35 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         table.name());
   }
 
+  public Table getTable() {
+    return table;
+  }
+
+  public static ZoneId getDefaultZoneId(Types.NestedField expireField) {
+    Type type = expireField.type();
+    if (type.typeId() == Type.TypeID.STRING) {
+      return ZoneId.systemDefault();
+    }
+    return ZoneOffset.UTC;
+  }
+
+  private void runWithCondition(boolean condition, Runnable fun) {
+    if (condition) {
+      fun.run();
+    }
+  }
+
+  /** Internal class for expiring files. */
   public static class ExpireFiles {
     Queue<DataFile> dataFiles;
     Queue<DeleteFile> deleteFiles;
 
-    ExpireFiles() {
+    public ExpireFiles() {
       this.dataFiles = new LinkedTransferQueue<>();
       this.deleteFiles = new LinkedTransferQueue<>();
     }
 
-    void addFile(FileEntry entry) {
+    public void addFile(FileEntry entry) {
       ContentFile<?> file = entry.getFile();
       switch (file.content()) {
         case DATA:
@@ -896,6 +953,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
+  /** Internal class for tracking data file freshness. */
   public static class DataFileFreshness {
     long latestExpiredSeq;
     long latestUpdateMillis;
@@ -928,7 +986,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     }
   }
 
-  static boolean mayExpired(
+  public static boolean mayExpired(
       FileEntry fileEntry,
       Map<StructLike, DataFileFreshness> partitionFreshness,
       Long expireTimestamp) {
@@ -961,7 +1019,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return expired;
   }
 
-  static boolean willNotRetain(
+  public static boolean willNotRetain(
       FileEntry fileEntry,
       DataExpirationConfig expirationConfig,
       Map<StructLike, DataFileFreshness> partitionFreshness) {
@@ -1074,23 +1132,12 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return !compareResults.isEmpty() && 
compareResults.stream().allMatch(Boolean::booleanValue);
   }
 
-  public Table getTable() {
-    return table;
-  }
-
-  public static ZoneId getDefaultZoneId(Types.NestedField expireField) {
-    Type type = expireField.type();
-    if (type.typeId() == Type.TypeID.STRING) {
-      return ZoneId.systemDefault();
-    }
-    return ZoneOffset.UTC;
-  }
-
+  /** Internal class for file entry with expiration info. */
   public static class FileEntry {
     private final ContentFile<?> file;
     private final Literal<Long> tsBound;
 
-    FileEntry(ContentFile<?> file, Literal<Long> tsBound) {
+    public FileEntry(ContentFile<?> file, Literal<Long> tsBound) {
       this.file = file;
       this.tsBound = tsBound;
     }
@@ -1103,10 +1150,4 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
       return tsBound;
     }
   }
-
-  private void runWithCondition(boolean condition, Runnable fun) {
-    if (condition) {
-      fun.run();
-    }
-  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
similarity index 80%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
rename to 
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
index 987192f89..72e4a6429 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
@@ -16,21 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
 
 import static 
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.min;
 import static 
org.apache.amoro.utils.MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
 
 import org.apache.amoro.IcebergFileEntry;
-import org.apache.amoro.TableFormat;
 import org.apache.amoro.config.DataExpirationConfig;
 import org.apache.amoro.data.FileNameRules;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainerContext;
 import org.apache.amoro.scan.TableEntriesScan;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
-import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
-import org.apache.amoro.server.utils.HiveLocationUtil;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -54,6 +53,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeMap;
 import org.slf4j.Logger;
@@ -76,22 +76,25 @@ public class MixedTableMaintainer implements 
TableMaintainer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MixedTableMaintainer.class);
 
+  public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
+      Sets.newHashSet(
+          Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG, 
Type.TypeID.DATE);
+
   private final MixedTable mixedTable;
+  private final TableMaintainerContext context;
 
   private ChangeTableMaintainer changeMaintainer;
-
   private final BaseTableMaintainer baseMaintainer;
-  private final DefaultTableRuntime tableRuntime;
 
-  public MixedTableMaintainer(MixedTable mixedTable, DefaultTableRuntime 
tableRuntime) {
+  public MixedTableMaintainer(MixedTable mixedTable, TableMaintainerContext 
context) {
     this.mixedTable = mixedTable;
-    this.tableRuntime = tableRuntime;
+    this.context = context;
     if (mixedTable.isKeyedTable()) {
       changeMaintainer =
-          new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(), 
tableRuntime);
-      baseMaintainer = new 
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), tableRuntime);
+          new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(), 
context);
+      baseMaintainer = new 
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), context);
     } else {
-      baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(), 
tableRuntime);
+      baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(), 
context);
     }
   }
 
@@ -103,6 +106,11 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     baseMaintainer.cleanOrphanFiles();
   }
 
+  @Override
+  public void cleanDanglingDeleteFiles() {
+    // Mixed table doesn't support clean dangling delete files
+  }
+
   @Override
   public void expireSnapshots() {
     if (changeMaintainer != null) {
@@ -121,13 +129,11 @@ public class MixedTableMaintainer implements 
TableMaintainer {
 
   @Override
   public void expireData() {
-    DataExpirationConfig expirationConfig =
-        tableRuntime.getTableConfiguration().getExpiringDataConfig();
+    DataExpirationConfig expirationConfig = 
context.getTableConfiguration().getExpiringDataConfig();
     try {
       Types.NestedField field =
           mixedTable.schema().findField(expirationConfig.getExpirationField());
-      if (!TableConfigurations.isValidDataExpirationField(
-          expirationConfig, field, mixedTable.name())) {
+      if (!isValidDataExpirationField(expirationConfig, field, 
mixedTable.name())) {
         return;
       }
 
@@ -137,7 +143,7 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     }
   }
 
-  protected Instant expireMixedBaseOnRule(
+  public Instant expireMixedBaseOnRule(
       DataExpirationConfig expirationConfig, Types.NestedField field) {
     Instant changeInstant =
         Optional.ofNullable(changeMaintainer).isPresent()
@@ -250,23 +256,21 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     throw new UnsupportedOperationException("Mixed table doesn't support auto 
create tags");
   }
 
-  protected void cleanContentFiles(
-      long lastTime, TableOrphanFilesCleaningMetrics 
orphanFilesCleaningMetrics) {
+  public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) {
     if (changeMaintainer != null) {
-      changeMaintainer.cleanContentFiles(lastTime, orphanFilesCleaningMetrics);
+      changeMaintainer.cleanContentFiles(lastTime, metrics);
     }
-    baseMaintainer.cleanContentFiles(lastTime, orphanFilesCleaningMetrics);
+    baseMaintainer.cleanContentFiles(lastTime, metrics);
   }
 
-  protected void cleanMetadata(
-      long lastTime, TableOrphanFilesCleaningMetrics 
orphanFilesCleaningMetrics) {
+  public void cleanMetadata(long lastTime, MaintainerMetrics metrics) {
     if (changeMaintainer != null) {
-      changeMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
+      changeMaintainer.cleanMetadata(lastTime, metrics);
     }
-    baseMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
+    baseMaintainer.cleanMetadata(lastTime, metrics);
   }
 
-  protected void doCleanDanglingDeleteFiles() {
+  public void doCleanDanglingDeleteFiles() {
     if (changeMaintainer != null) {
       changeMaintainer.doCleanDanglingDeleteFiles();
     }
@@ -281,50 +285,89 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     return baseMaintainer;
   }
 
+  /**
+   * Check if the given field is valid for data expiration.
+   *
+   * @param config data expiration config
+   * @param field table nested field
+   * @param name table name
+   * @return true if field is valid
+   */
+  protected boolean isValidDataExpirationField(
+      DataExpirationConfig config, Types.NestedField field, String name) {
+    return config.isEnabled()
+        && config.getRetentionTime() > 0
+        && validateExpirationField(field, name, config.getExpirationField());
+  }
+
+  private static boolean validateExpirationField(
+      Types.NestedField field, String name, String expirationField) {
+    if (Strings.isNullOrEmpty(expirationField) || null == field) {
+      LOG.warn(
+          String.format(
+              "Field(%s) used to determine data expiration is illegal for 
table(%s)",
+              expirationField, name));
+      return false;
+    }
+    Type.TypeID typeID = field.type().typeId();
+    if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
+      LOG.warn(
+          String.format(
+              "Table(%s) field(%s) type(%s) is not supported for data 
expiration, please use the "
+                  + "following types: %s",
+              name,
+              expirationField,
+              typeID.name(),
+              DATA_EXPIRATION_FIELD_TYPES.stream()
+                  .map(Enum::name)
+                  .collect(Collectors.joining(", "))));
+      return false;
+    }
+
+    return true;
+  }
+
+  /** Inner class for maintaining change table of mixed table. */
   public class ChangeTableMaintainer extends IcebergTableMaintainer {
 
     private static final int DATA_FILE_LIST_SPLIT = 3000;
 
     private final UnkeyedTable unkeyedTable;
 
-    public ChangeTableMaintainer(UnkeyedTable unkeyedTable, 
DefaultTableRuntime tableRuntime) {
-      super(unkeyedTable, mixedTable.id(), tableRuntime);
+    public ChangeTableMaintainer(UnkeyedTable unkeyedTable, 
TableMaintainerContext context) {
+      super(unkeyedTable, mixedTable.id(), context);
       this.unkeyedTable = unkeyedTable;
     }
 
     @Override
     @VisibleForTesting
-    void expireSnapshots(long mustOlderThan, int minCount) {
+    public void expireSnapshots(long mustOlderThan, int minCount) {
       expireFiles(mustOlderThan);
       super.expireSnapshots(mustOlderThan, minCount);
     }
 
     @Override
     public void expireSnapshots() {
-      if (!expireSnapshotEnabled(tableRuntime)) {
+      if (!expireSnapshotEnabled()) {
         return;
       }
       long now = System.currentTimeMillis();
-      expireFiles(now - snapshotsKeepTime(tableRuntime));
-      expireSnapshots(
-          mustOlderThan(tableRuntime, now),
-          tableRuntime.getTableConfiguration().getSnapshotMinCount());
+      expireFiles(now - getSnapshotsKeepTime());
+      expireSnapshots(getMustOlderThan(now), 
context.getTableConfiguration().getSnapshotMinCount());
     }
 
-    @Override
-    protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+    private long getSnapshotsKeepTime() {
+      return context.getTableConfiguration().getChangeDataTTLMinutes() * 60 * 
1000;
+    }
+
+    private long getMustOlderThan(long now) {
       return min(
           // The change data ttl time
-          now - snapshotsKeepTime(tableRuntime),
+          now - getSnapshotsKeepTime(),
           // The latest flink committed snapshot should not be expired for 
recovering flink job
           fetchLatestFlinkCommittedSnapshotTime(table));
     }
 
-    @Override
-    protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
-      return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() * 
60 * 1000;
-    }
-
     public void expireFiles(long ttlPoint) {
       List<IcebergFileEntry> expiredDataFileEntries = 
getExpiredDataFileEntries(ttlPoint);
       deleteChangeFile(expiredDataFileEntries);
@@ -440,14 +483,14 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     }
   }
 
+  /** Inner class for maintaining base table of mixed table. */
   public class BaseTableMaintainer extends IcebergTableMaintainer {
-    private final Set<String> hiveFiles = Sets.newHashSet();
+    private final Set<String> hiveFiles;
 
-    public BaseTableMaintainer(UnkeyedTable unkeyedTable, DefaultTableRuntime 
tableRuntime) {
-      super(unkeyedTable, mixedTable.id(), tableRuntime);
-      if (unkeyedTable.format() == TableFormat.MIXED_HIVE) {
-        hiveFiles.addAll(HiveLocationUtil.getHiveLocation(mixedTable));
-      }
+    public BaseTableMaintainer(UnkeyedTable unkeyedTable, 
TableMaintainerContext context) {
+      super(unkeyedTable, mixedTable.id(), context);
+      // Get Hive location paths from context, no longer depends on 
HiveLocationUtil
+      this.hiveFiles = context.getHiveLocationPaths();
     }
 
     @Override
@@ -457,14 +500,14 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     }
 
     @Override
-    protected Set<String> expireSnapshotNeedToExcludeFiles() {
+    public Set<String> expireSnapshotNeedToExcludeFiles() {
       return hiveFiles;
     }
 
     @Override
-    protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+    public long mustOlderThan(long now) {
       DataExpirationConfig expiringDataConfig =
-          tableRuntime.getTableConfiguration().getExpiringDataConfig();
+          context.getTableConfiguration().getExpiringDataConfig();
       long dataExpiringSnapshotTime =
           expiringDataConfig.isEnabled()
                   && expiringDataConfig.getBaseOnRule()
@@ -474,9 +517,9 @@ public class MixedTableMaintainer implements 
TableMaintainer {
 
       return min(
           // The snapshots keep time for base store
-          now - snapshotsKeepTime(tableRuntime),
+          now - snapshotsKeepTime(),
           // The snapshot optimizing plan based should not be expired for 
committing
-          fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+          fetchOptimizingPlanSnapshotTime(table),
           // The latest non-optimized snapshot should not be expired for data 
expiring
           dataExpiringSnapshotTime,
           // The latest flink committed snapshot should not be expired for 
recovering flink job
@@ -505,6 +548,7 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     }
   }
 
+  /** Entry wrapper for mixed table files with expiration information. */
   public static class MixedFileEntry extends IcebergTableMaintainer.FileEntry {
 
     private final boolean isChange;
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
new file mode 100644
index 000000000..23a5b48a2
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.iceberg.utils;
+
+import org.apache.amoro.IcebergFileEntry;
+import org.apache.amoro.iceberg.Constants;
+import org.apache.amoro.scan.TableEntriesScan;
+import org.apache.amoro.shade.guava32.com.google.common.base.Predicate;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Util class for Iceberg table operations in format-iceberg module. */
+public class IcebergTableUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableUtil.class);
+
+  private IcebergTableUtil() {}
+
+  public static long getSnapshotId(Table table, boolean refresh) {
+    Snapshot currentSnapshot = getSnapshot(table, refresh);
+    if (currentSnapshot == null) {
+      return Constants.INVALID_SNAPSHOT_ID;
+    } else {
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  public static Snapshot getSnapshot(Table table, boolean refresh) {
+    if (refresh) {
+      table.refresh();
+    }
+    return table.currentSnapshot();
+  }
+
+  public static Optional<Snapshot> findFirstMatchSnapshot(
+      Table table, Predicate<Snapshot> predicate) {
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Collections.reverse(snapshots);
+    return Optional.ofNullable(Iterables.tryFind(snapshots, 
predicate).orNull());
+  }
+
+  /**
+   * Find the latest optimizing snapshot in the table.
+   *
+   * @param table the Iceberg table
+   * @return Optional snapshot
+   */
+  public static Optional<Snapshot> findLatestOptimizingSnapshot(Table table) {
+    return IcebergTableUtil.findFirstMatchSnapshot(
+        table,
+        snapshot ->
+            snapshot.summary().containsValue("OPTIMIZE")
+                && DataOperations.REPLACE.equals(snapshot.operation()));
+  }
+
+  public static Set<String> getAllContentFilePath(Table internalTable) {
+    Set<String> validFilesPath = new HashSet<>();
+
+    TableEntriesScan entriesScan =
+        TableEntriesScan.builder(internalTable)
+            .includeFileContent(
+                FileContent.DATA, FileContent.POSITION_DELETES, 
FileContent.EQUALITY_DELETES)
+            .allEntries()
+            .build();
+    try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
+      for (IcebergFileEntry entry : entries) {
+        
validFilesPath.add(TableFileUtil.getUriPath(entry.getFile().path().toString()));
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return validFilesPath;
+  }
+
+  public static Set<String> getAllStatisticsFilePath(Table table) {
+    return ReachableFileUtil.statisticsFilesLocations(table).stream()
+        .map(TableFileUtil::getUriPath)
+        .collect(Collectors.toSet());
+  }
+
+  public static Set<DeleteFile> getDanglingDeleteFiles(Table internalTable) {
+    if (internalTable.currentSnapshot() == null) {
+      return Collections.emptySet();
+    }
+    long snapshotId = internalTable.currentSnapshot().snapshotId();
+    Set<String> deleteFilesPath = new HashSet<>();
+    TableScan tableScan = internalTable.newScan().useSnapshot(snapshotId);
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+      for (FileScanTask fileScanTask : fileScanTasks) {
+        for (DeleteFile delete : fileScanTask.deletes()) {
+          deleteFilesPath.add(delete.path().toString());
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("table scan plan files error", e);
+      return Collections.emptySet();
+    }
+
+    Set<DeleteFile> danglingDeleteFiles = new HashSet<>();
+    TableEntriesScan entriesScan =
+        TableEntriesScan.builder(internalTable)
+            .useSnapshot(snapshotId)
+            .includeFileContent(FileContent.EQUALITY_DELETES, 
FileContent.POSITION_DELETES)
+            .build();
+    try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
+      for (IcebergFileEntry entry : entries) {
+        ContentFile<?> file = entry.getFile();
+        String path = file.path().toString();
+        if (!deleteFilesPath.contains(path)) {
+          danglingDeleteFiles.add((DeleteFile) file);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error when fetch iceberg entries", e);
+    }
+
+    return danglingDeleteFiles;
+  }
+
+  /**
+   * Fetch all manifest files of an Iceberg Table.
+   *
+   * @param table An iceberg table, or maybe base store or change store of 
mixed-iceberg format.
+   * @return Path set of all valid manifest files.
+   */
+  public static Set<String> getAllManifestFiles(Table table) {
+    TableOperations ops = ((HasTableOperations) table).operations();
+
+    Table allManifest =
+        MetadataTableUtils.createMetadataTableInstance(
+            ops,
+            table.name(),
+            table.name() + "#" + MetadataTableType.ALL_MANIFESTS.name(),
+            MetadataTableType.ALL_MANIFESTS);
+
+    Set<String> allManifestFiles =
+        Collections.newSetFromMap(new 
java.util.concurrent.ConcurrentHashMap());
+    TableScan scan = allManifest.newScan().select("path");
+
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+    CloseableIterable<CloseableIterable<StructLike>> transform =
+        CloseableIterable.transform(tasks, task -> task.asDataTask().rows());
+
+    try (CloseableIterable<StructLike> rows = 
CloseableIterable.concat(transform)) {
+      rows.forEach(r -> allManifestFiles.add(r.get(0, String.class)));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return allManifestFiles;
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
new file mode 100644
index 000000000..e453af95e
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.iceberg.utils;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Rolling file cleaner for Iceberg table maintenance operations. */
+public class RollingFileCleaner {
+  private final Set<String> collectedFiles = Sets.newConcurrentHashSet();
+  private final Set<String> excludeFiles;
+  private final Set<String> parentDirectories = Sets.newConcurrentHashSet();
+
+  private static final int CLEANED_FILE_GROUP_SIZE = 1_000;
+
+  private final AtomicInteger fileCounter = new AtomicInteger(0);
+  private final AtomicInteger cleanedFileCounter = new AtomicInteger(0);
+
+  private final AuthenticatedFileIO fileIO;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RollingFileCleaner.class);
+
+  public RollingFileCleaner(AuthenticatedFileIO fileIO, Set<String> 
excludeFiles) {
+    this.fileIO = fileIO;
+    this.excludeFiles =
+        excludeFiles != null
+            ? Sets.newConcurrentHashSet(excludeFiles)
+            : Sets.newConcurrentHashSet();
+  }
+
+  public void addFile(String filePath) {
+    if (isFileExcluded(filePath)) {
+      LOG.debug("File {} is excluded from cleaning", filePath);
+      return;
+    }
+
+    collectedFiles.add(filePath);
+    String parentDir = TableFileUtil.getParent(filePath);
+    parentDirectories.add(parentDir);
+    int currentCount = fileCounter.incrementAndGet();
+
+    if (currentCount % CLEANED_FILE_GROUP_SIZE == 0) {
+      doCleanFiles();
+    }
+  }
+
+  private boolean isFileExcluded(String filePath) {
+    if (excludeFiles.isEmpty()) {
+      return false;
+    }
+
+    if (excludeFiles.contains(filePath)) {
+      return true;
+    }
+
+    String uriPath = URI.create(filePath).getPath();
+    if (excludeFiles.contains(uriPath)) {
+      return true;
+    }
+
+    String parentPath = new Path(uriPath).getParent().toString();
+    return excludeFiles.contains(parentPath);
+  }
+
+  private void doCleanFiles() {
+    if (collectedFiles.isEmpty()) {
+      return;
+    }
+
+    if (fileIO.supportBulkOperations()) {
+      try {
+        fileIO.asBulkFileIO().deleteFiles(collectedFiles);
+        cleanedFileCounter.addAndGet(collectedFiles.size());
+      } catch (BulkDeletionFailureException e) {
+        LOG.warn("Failed to delete {} expired files in bulk", 
e.numberFailedObjects());
+      }
+    } else {
+      for (String filePath : collectedFiles) {
+        try {
+          fileIO.deleteFile(filePath);
+          cleanedFileCounter.incrementAndGet();
+        } catch (Exception e) {
+          LOG.warn("Failed to delete expired file: {}", filePath, e);
+        }
+      }
+    }
+    // Try to delete empty parent directories
+    for (String parentDir : parentDirectories) {
+      TableFileUtil.deleteEmptyDirectory(fileIO, parentDir, excludeFiles);
+    }
+    parentDirectories.clear();
+
+    LOG.debug("Cleaned expired a file group, total files: {}", 
collectedFiles.size());
+
+    collectedFiles.clear();
+  }
+
+  public int fileCount() {
+    return fileCounter.get();
+  }
+
+  public int cleanedFileCount() {
+    return cleanedFileCounter.get();
+  }
+
+  public void clear() {
+    if (!collectedFiles.isEmpty()) {
+      doCleanFiles();
+    }
+
+    collectedFiles.clear();
+    excludeFiles.clear();
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
similarity index 85%
rename from 
amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
rename to 
amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
index 1469d0264..4eb637edd 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
 
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.TableTestBase;
 import org.apache.amoro.config.TagConfiguration;
-import org.apache.amoro.server.table.TableConfigurations;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.utils.CompatiblePropertyUtil;
 import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
@@ -40,6 +40,8 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+import java.util.Map;
 
 public class TestAutoCreateIcebergTagAction extends TableTestBase {
 
@@ -57,7 +59,7 @@ public class TestAutoCreateIcebergTagAction extends 
TableTestBase {
 
   @NotNull
   private AutoCreateIcebergTagAction newAutoCreateIcebergTagAction(Table 
table, LocalDateTime now) {
-    TagConfiguration tagConfig = 
TableConfigurations.parseTagConfiguration(table.properties());
+    TagConfiguration tagConfig = parseTagConfiguration(table.properties());
     return new AutoCreateIcebergTagAction(table, tagConfig, now);
   }
 
@@ -347,6 +349,58 @@ public class TestAutoCreateIcebergTagAction extends 
TableTestBase {
     Assert.assertEquals(expectedTriggerTime, actualTriggerTime);
   }
 
+  /**
+   * Parse tag configuration from table properties. This is a test helper 
method that replicates the
+   * logic from TableConfigurations to avoid AMS dependency.
+   */
+  private TagConfiguration parseTagConfiguration(Map<String, String> 
tableProperties) {
+    TagConfiguration tagConfig = new TagConfiguration();
+    tagConfig.setAutoCreateTag(
+        CompatiblePropertyUtil.propertyAsBoolean(
+            tableProperties,
+            TableProperties.ENABLE_AUTO_CREATE_TAG,
+            TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT));
+    tagConfig.setTriggerPeriod(
+        TagConfiguration.Period.valueOf(
+            CompatiblePropertyUtil.propertyAsString(
+                    tableProperties,
+                    TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD,
+                    TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT)
+                .toUpperCase(Locale.ROOT)));
+
+    String defaultFormat;
+    switch (tagConfig.getTriggerPeriod()) {
+      case DAILY:
+        defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_DAILY_DEFAULT;
+        break;
+      case HOURLY:
+        defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_HOURLY_DEFAULT;
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported trigger period: " + tagConfig.getTriggerPeriod());
+    }
+    tagConfig.setTagFormat(
+        CompatiblePropertyUtil.propertyAsString(
+            tableProperties, TableProperties.AUTO_CREATE_TAG_FORMAT, 
defaultFormat));
+    tagConfig.setTriggerOffsetMinutes(
+        CompatiblePropertyUtil.propertyAsInt(
+            tableProperties,
+            TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES,
+            TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES_DEFAULT));
+    tagConfig.setMaxDelayMinutes(
+        CompatiblePropertyUtil.propertyAsInt(
+            tableProperties,
+            TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES,
+            TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES_DEFAULT));
+    tagConfig.setTagMaxAgeMs(
+        CompatiblePropertyUtil.propertyAsLong(
+            tableProperties,
+            TableProperties.AUTO_CREATE_TAG_MAX_AGE_MS,
+            TableProperties.AUTO_CREATE_TAG_MAX_AGE_MS_DEFAULT));
+    return tagConfig;
+  }
+
   private long getOffsetMinutesOfToday(long millis) {
     LocalDateTime now = fromEpochMillis(millis);
     LocalDateTime today = LocalDateTime.of(now.toLocalDate(), 
LocalTime.ofSecondOfDay(0));

Reply via email to