This is an automated email from the ASF dual-hosted git repository.

baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b9a74dcc [Improvement]: Refactor code to replace DefaultTableRuntime 
with TableRuntime interface (#3644)
2b9a74dcc is described below

commit 2b9a74dcc9d8f02847ca262977484fb12387162a
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 5 15:31:46 2025 +0800

    [Improvement]: Refactor code to replace DefaultTableRuntime with 
TableRuntime interface (#3644)
    
    * [Improvement]: Refactor make TableService return TableRuntime interface
    
    * [Improvement]: Refactor make RuntimeHandlerChain depends on  TableRuntime 
interface
    
    * c
    
    * remove table runtime from table maintainer
    
    * rename
    
    * rename
    
    ---------
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../amoro/server/DefaultOptimizingService.java     | 31 ++++++++------
 .../maintainer/IcebergTableMaintainer.java         | 35 +++++++++-------
 .../maintainer/MixedTableMaintainer.java           | 49 ++++++++++++----------
 .../optimizing/maintainer/TableMaintainer.java     | 29 +++----------
 ...{TableMaintainer.java => TableMaintainers.java} | 48 ++++++---------------
 .../scheduler/PeriodicExternalScheduler.java       | 26 ++++++++----
 .../server/scheduler/PeriodicTableScheduler.java   | 29 ++++++-------
 .../scheduler/inline/BlockerExpiringExecutor.java  | 10 ++---
 .../DanglingDeleteFilesCleaningExecutor.java       | 18 ++++----
 .../scheduler/inline/DataExpiringExecutor.java     | 16 +++----
 .../scheduler/inline/HiveCommitSyncExecutor.java   |  8 ++--
 .../scheduler/inline/OptimizingCommitExecutor.java | 25 ++++++-----
 .../inline/OptimizingExpiringExecutor.java         | 10 ++---
 .../inline/OrphanFilesCleaningExecutor.java        | 16 +++----
 .../inline/SnapshotsExpiringExecutor.java          | 16 +++----
 .../inline/TableRuntimeRefreshExecutor.java        | 30 ++++++++-----
 .../scheduler/inline/TagsAutoCreatingExecutor.java | 16 +++----
 .../amoro/server/table/DefaultTableRuntime.java    |  1 +
 .../amoro/server/table/DefaultTableService.java    | 22 +++-------
 .../amoro/server/table/RuntimeHandlerChain.java    | 24 +++++------
 .../apache/amoro/server/table/TableService.java    |  3 +-
 .../apache/amoro/server/AMSServiceTestBase.java    |  5 +++
 .../amoro/server/RestCatalogServiceTestBase.java   |  6 +--
 .../amoro/server/TestDefaultOptimizingService.java | 16 +++----
 .../server/dashboard/TestOverviewManager.java      |  4 +-
 .../optimizing/maintainer/TestDataExpire.java      | 16 +++----
 .../optimizing/maintainer/TestOrphanFileClean.java | 16 +++----
 .../maintainer/TestOrphanFileCleanHive.java        |  2 +-
 .../maintainer/TestOrphanFileCleanIceberg.java     |  5 ++-
 .../optimizing/maintainer/TestSnapshotExpire.java  | 48 +++++++++++----------
 .../maintainer/TestSnapshotExpireHive.java         |  2 +-
 .../amoro/server/table/AMSTableTestBase.java       |  3 +-
 .../table/TestDefaultTableRuntimeHandler.java      | 37 +++++++---------
 .../table/TestDefaultTableRuntimeManager.java      |  2 +-
 .../table/TestSyncTableOfExternalCatalog.java      |  2 +-
 .../amoro/server/table/TestTableManager.java       |  3 +-
 .../server/table/TestTableSummaryMetrics.java      |  4 +-
 .../main/java/org/apache/amoro/TableRuntime.java   |  3 ++
 38 files changed, 314 insertions(+), 322 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index c69e9194e..ffdfd72b8 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.OptimizerProperties;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.api.OptimizerRegisterInfo;
 import org.apache.amoro.api.OptimizingService;
 import org.apache.amoro.api.OptimizingTask;
@@ -290,7 +291,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
       return false;
     }
     long tableId = processMeta.getTableId();
-    DefaultTableRuntime tableRuntime = tableService.getRuntime(tableId);
+    DefaultTableRuntime tableRuntime = (DefaultTableRuntime) 
tableService.getRuntime(tableId);
     if (tableRuntime == null) {
       return false;
     }
@@ -379,17 +380,17 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {
 
     @Override
-    public void handleStatusChanged(
-        DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
-      if 
(!tableRuntime.getOptimizingState().getOptimizingStatus().isProcessing()) {
-        
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
-            .ifPresent(q -> q.refreshTable(tableRuntime));
+    public void handleStatusChanged(TableRuntime tableRuntime, 
OptimizingStatus originalStatus) {
+      DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) 
tableRuntime;
+      if 
(!defaultTableRuntime.getOptimizingState().getOptimizingStatus().isProcessing())
 {
+        
getOptionalQueueByGroup(defaultTableRuntime.getOptimizingState().getOptimizerGroup())
+            .ifPresent(q -> q.refreshTable(defaultTableRuntime));
       }
     }
 
     @Override
-    public void handleConfigChanged(
-        DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+    public void handleConfigChanged(TableRuntime runtime, TableConfiguration 
originalConfig) {
+      DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
       String originalGroup = 
originalConfig.getOptimizingConfig().getOptimizerGroup();
       if 
(!tableRuntime.getOptimizingState().getOptimizerGroup().equals(originalGroup)) {
         getOptionalQueueByGroup(originalGroup).ifPresent(q -> 
q.releaseTable(tableRuntime));
@@ -399,21 +400,27 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     }
 
     @Override
-    public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime 
tableRuntime) {
+    public void handleTableAdded(AmoroTable<?> table, TableRuntime runtime) {
+      DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
       
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
           .ifPresent(q -> q.refreshTable(tableRuntime));
     }
 
     @Override
-    public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+    public void handleTableRemoved(TableRuntime runtime) {
+      DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
       
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
           .ifPresent(queue -> queue.releaseTable(tableRuntime));
     }
 
     @Override
-    protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+    protected void initHandler(List<TableRuntime> tableRuntimeList) {
       LOG.info("OptimizerManagementService begin initializing");
-      loadOptimizingQueues(tableRuntimeList);
+      loadOptimizingQueues(
+          tableRuntimeList.stream()
+              .filter(t -> t instanceof DefaultTableRuntime)
+              .map(t -> (DefaultTableRuntime) t)
+              .collect(Collectors.toList()));
       optimizerKeeper.start();
       optimizingConfigWatcher.start();
       LOG.info("SuspendingDetector for Optimizer has been started.");
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 42d5428b5..76e4a190c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -23,6 +23,7 @@ import static 
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.
 import org.apache.amoro.api.CommitMetaProducer;
 import org.apache.amoro.config.DataExpirationConfig;
 import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.config.TagConfiguration;
 import org.apache.amoro.iceberg.Constants;
 import org.apache.amoro.io.AuthenticatedFileIO;
 import org.apache.amoro.io.PathInfo;
@@ -37,6 +38,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.base.Strings;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.utils.TableFileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.ContentFile;
@@ -119,13 +121,18 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
           CommitMetaProducer.CLEAN_DANGLING_DELETE.name());
 
   protected Table table;
+  private final TableIdentifier tableIdentifier;
+  private final DefaultTableRuntime tableRuntime;
 
-  public IcebergTableMaintainer(Table table) {
+  public IcebergTableMaintainer(
+      Table table, TableIdentifier tableIdentifier, DefaultTableRuntime 
tableRuntime) {
     this.table = table;
+    this.tableIdentifier = tableIdentifier;
+    this.tableRuntime = tableRuntime;
   }
 
   @Override
-  public void cleanOrphanFiles(DefaultTableRuntime tableRuntime) {
+  public void cleanOrphanFiles() {
     TableConfiguration tableConfiguration = 
tableRuntime.getTableConfiguration();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         tableRuntime.getOptimizingState().getOrphanFilesCleaningMetrics();
@@ -146,9 +153,8 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
+  public void cleanDanglingDeleteFiles() {
     TableConfiguration tableConfiguration = 
tableRuntime.getTableConfiguration();
-
     if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
       return;
     }
@@ -161,7 +167,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
         
Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
     if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) 
> 0) {
       // clear dangling delete files
-      cleanDanglingDeleteFiles();
+      doCleanDanglingDeleteFiles();
     } else {
       LOG.debug(
           "There are no delete files here, so there is no need to clean 
dangling delete file for table {}",
@@ -170,7 +176,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+  public void expireSnapshots() {
     if (!expireSnapshotEnabled(tableRuntime)) {
       return;
     }
@@ -247,10 +253,10 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void expireData(DefaultTableRuntime tableRuntime) {
+  public void expireData() {
+    DataExpirationConfig expirationConfig =
+        tableRuntime.getTableConfiguration().getExpiringDataConfig();
     try {
-      DataExpirationConfig expirationConfig =
-          tableRuntime.getTableConfiguration().getExpiringDataConfig();
       Types.NestedField field = 
table.schema().findField(expirationConfig.getExpirationField());
       if (!TableConfigurations.isValidDataExpirationField(expirationConfig, 
field, table.name())) {
         return;
@@ -258,7 +264,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
 
       expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, 
field));
     } catch (Throwable t) {
-      LOG.error("Unexpected purge error for table {} ", 
tableRuntime.getTableIdentifier(), t);
+      LOG.error("Unexpected purge error for table {} ", tableIdentifier, t);
     }
   }
 
@@ -311,10 +317,9 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void autoCreateTags(DefaultTableRuntime tableRuntime) {
-    new AutoCreateIcebergTagAction(
-            table, tableRuntime.getTableConfiguration().getTagConfiguration(), 
LocalDateTime.now())
-        .execute();
+  public void autoCreateTags() {
+    TagConfiguration tagConfiguration = 
tableRuntime.getTableConfiguration().getTagConfiguration();
+    new AutoCreateIcebergTagAction(table, tagConfiguration, 
LocalDateTime.now()).execute();
   }
 
   protected void cleanContentFiles(
@@ -339,7 +344,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     clearInternalTableMetadata(lastTime, orphanFilesCleaningMetrics);
   }
 
-  protected void cleanDanglingDeleteFiles() {
+  protected void doCleanDanglingDeleteFiles() {
     LOG.info("Starting cleaning dangling delete files for table {}", 
table.name());
     int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles();
     runWithCondition(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
index a6e998c8a..987192f89 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
@@ -81,31 +81,34 @@ public class MixedTableMaintainer implements 
TableMaintainer {
   private ChangeTableMaintainer changeMaintainer;
 
   private final BaseTableMaintainer baseMaintainer;
+  private final DefaultTableRuntime tableRuntime;
 
-  public MixedTableMaintainer(MixedTable mixedTable) {
+  public MixedTableMaintainer(MixedTable mixedTable, DefaultTableRuntime 
tableRuntime) {
     this.mixedTable = mixedTable;
+    this.tableRuntime = tableRuntime;
     if (mixedTable.isKeyedTable()) {
-      changeMaintainer = new 
ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable());
-      baseMaintainer = new 
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable());
+      changeMaintainer =
+          new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(), 
tableRuntime);
+      baseMaintainer = new 
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), tableRuntime);
     } else {
-      baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable());
+      baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(), 
tableRuntime);
     }
   }
 
   @Override
-  public void cleanOrphanFiles(DefaultTableRuntime tableRuntime) {
+  public void cleanOrphanFiles() {
     if (changeMaintainer != null) {
-      changeMaintainer.cleanOrphanFiles(tableRuntime);
+      changeMaintainer.cleanOrphanFiles();
     }
-    baseMaintainer.cleanOrphanFiles(tableRuntime);
+    baseMaintainer.cleanOrphanFiles();
   }
 
   @Override
-  public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+  public void expireSnapshots() {
     if (changeMaintainer != null) {
-      changeMaintainer.expireSnapshots(tableRuntime);
+      changeMaintainer.expireSnapshots();
     }
-    baseMaintainer.expireSnapshots(tableRuntime);
+    baseMaintainer.expireSnapshots();
   }
 
   @VisibleForTesting
@@ -117,10 +120,10 @@ public class MixedTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void expireData(DefaultTableRuntime tableRuntime) {
+  public void expireData() {
+    DataExpirationConfig expirationConfig =
+        tableRuntime.getTableConfiguration().getExpiringDataConfig();
     try {
-      DataExpirationConfig expirationConfig =
-          tableRuntime.getTableConfiguration().getExpiringDataConfig();
       Types.NestedField field =
           mixedTable.schema().findField(expirationConfig.getExpirationField());
       if (!TableConfigurations.isValidDataExpirationField(
@@ -130,7 +133,7 @@ public class MixedTableMaintainer implements 
TableMaintainer {
 
       expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig, 
field));
     } catch (Throwable t) {
-      LOG.error("Unexpected purge error for table {} ", 
tableRuntime.getTableIdentifier(), t);
+      LOG.error("Unexpected purge error for table {} ", mixedTable.id(), t);
     }
   }
 
@@ -243,7 +246,7 @@ public class MixedTableMaintainer implements 
TableMaintainer {
   }
 
   @Override
-  public void autoCreateTags(DefaultTableRuntime tableRuntime) {
+  public void autoCreateTags() {
     throw new UnsupportedOperationException("Mixed table doesn't support auto 
create tags");
   }
 
@@ -263,11 +266,11 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     baseMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
   }
 
-  protected void cleanDanglingDeleteFiles() {
+  protected void doCleanDanglingDeleteFiles() {
     if (changeMaintainer != null) {
-      changeMaintainer.cleanDanglingDeleteFiles();
+      changeMaintainer.doCleanDanglingDeleteFiles();
     }
-    baseMaintainer.cleanDanglingDeleteFiles();
+    baseMaintainer.doCleanDanglingDeleteFiles();
   }
 
   public ChangeTableMaintainer getChangeMaintainer() {
@@ -284,8 +287,8 @@ public class MixedTableMaintainer implements 
TableMaintainer {
 
     private final UnkeyedTable unkeyedTable;
 
-    public ChangeTableMaintainer(UnkeyedTable unkeyedTable) {
-      super(unkeyedTable);
+    public ChangeTableMaintainer(UnkeyedTable unkeyedTable, 
DefaultTableRuntime tableRuntime) {
+      super(unkeyedTable, mixedTable.id(), tableRuntime);
       this.unkeyedTable = unkeyedTable;
     }
 
@@ -297,7 +300,7 @@ public class MixedTableMaintainer implements 
TableMaintainer {
     }
 
     @Override
-    public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+    public void expireSnapshots() {
       if (!expireSnapshotEnabled(tableRuntime)) {
         return;
       }
@@ -440,8 +443,8 @@ public class MixedTableMaintainer implements 
TableMaintainer {
   public class BaseTableMaintainer extends IcebergTableMaintainer {
     private final Set<String> hiveFiles = Sets.newHashSet();
 
-    public BaseTableMaintainer(UnkeyedTable unkeyedTable) {
-      super(unkeyedTable);
+    public BaseTableMaintainer(UnkeyedTable unkeyedTable, DefaultTableRuntime 
tableRuntime) {
+      super(unkeyedTable, mixedTable.id(), tableRuntime);
       if (unkeyedTable.format() == TableFormat.MIXED_HIVE) {
         hiveFiles.addAll(HiveLocationUtil.getHiveLocation(mixedTable));
       }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
index 4db6917a8..9474102b2 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
@@ -18,12 +18,6 @@
 
 package org.apache.amoro.server.optimizing.maintainer;
 
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableFormat;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.table.MixedTable;
-import org.apache.iceberg.Table;
-
 /**
  * API for maintaining table.
  *
@@ -34,10 +28,10 @@ import org.apache.iceberg.Table;
 public interface TableMaintainer {
 
   /** Clean table orphan files. Includes: data files, metadata files. */
-  void cleanOrphanFiles(DefaultTableRuntime tableRuntime);
+  void cleanOrphanFiles();
 
   /** Clean table dangling delete files. */
-  default void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
+  default void cleanDanglingDeleteFiles() {
     // DO nothing by default
   }
 
@@ -45,27 +39,14 @@ public interface TableMaintainer {
    * Expire snapshots. The optimizing based on the snapshot that the current 
table relies on will
    * not expire according to TableRuntime.
    */
-  void expireSnapshots(DefaultTableRuntime tableRuntime);
+  void expireSnapshots();
 
   /**
    * Expire historical data based on the expiration field, and data that 
exceeds the retention
    * period will be purged
-   *
-   * @param tableRuntime TableRuntime
    */
-  void expireData(DefaultTableRuntime tableRuntime);
+  void expireData();
 
   /** Auto create tags for table. */
-  void autoCreateTags(DefaultTableRuntime tableRuntime);
-
-  static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
-    TableFormat format = amoroTable.format();
-    if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
-      return new MixedTableMaintainer((MixedTable) amoroTable.originalTable());
-    } else if (TableFormat.ICEBERG.equals(format)) {
-      return new IcebergTableMaintainer((Table) amoroTable.originalTable());
-    } else {
-      throw new RuntimeException("Unsupported table type" + 
amoroTable.originalTable().getClass());
-    }
-  }
+  void autoCreateTags();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
similarity index 51%
copy from 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
copy to 
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
index 4db6917a8..41f49bb23 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
@@ -20,50 +20,26 @@ package org.apache.amoro.server.optimizing.maintainer;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.Table;
 
-/**
- * API for maintaining table.
- *
- * <p>Includes: clean content files, clean metadata, clean dangling delete 
files, expire snapshots,
- * auto create tags.
- */
-// TODO TableMaintainer should not be in this optimizing.xxx package.
-public interface TableMaintainer {
-
-  /** Clean table orphan files. Includes: data files, metadata files. */
-  void cleanOrphanFiles(DefaultTableRuntime tableRuntime);
-
-  /** Clean table dangling delete files. */
-  default void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
-    // DO nothing by default
-  }
-
-  /**
-   * Expire snapshots. The optimizing based on the snapshot that the current 
table relies on will
-   * not expire according to TableRuntime.
-   */
-  void expireSnapshots(DefaultTableRuntime tableRuntime);
-
-  /**
-   * Expire historical data based on the expiration field, and data that 
exceeds the retention
-   * period will be purged
-   *
-   * @param tableRuntime TableRuntime
-   */
-  void expireData(DefaultTableRuntime tableRuntime);
-
-  /** Auto create tags for table. */
-  void autoCreateTags(DefaultTableRuntime tableRuntime);
+/** Factory for creating {@link TableMaintainer}. */
+public class TableMaintainers {
 
-  static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
+  /** Create a {@link TableMaintainer} for the given table. */
+  public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime 
tableRuntime) {
     TableFormat format = amoroTable.format();
     if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
-      return new MixedTableMaintainer((MixedTable) amoroTable.originalTable());
+      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+      return new MixedTableMaintainer(
+          (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime) 
tableRuntime);
     } else if (TableFormat.ICEBERG.equals(format)) {
-      return new IcebergTableMaintainer((Table) amoroTable.originalTable());
+      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+      return new IcebergTableMaintainer(
+          (Table) amoroTable.originalTable(), amoroTable.id(), 
(DefaultTableRuntime) tableRuntime);
     } else {
       throw new RuntimeException("Unsupported table type" + 
amoroTable.originalTable().getClass());
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
index b9ad510d3..0b5141738 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.amoro.server.scheduler;
 
 import org.apache.amoro.Action;
+import org.apache.amoro.SupportsProcessPlugins;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.process.AmoroProcess;
 import org.apache.amoro.process.ManagedProcess;
@@ -32,10 +33,11 @@ import org.apache.amoro.resource.Resource;
 import org.apache.amoro.resource.ResourceManager;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.ProcessStateMapper;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 
 import java.util.List;
+import java.util.Optional;
 
 public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler 
{
 
@@ -56,21 +58,31 @@ public abstract class PeriodicExternalScheduler extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
-    tableRuntimeList.forEach(tableRuntime -> tableRuntime.install(getAction(), 
processFactory));
+  protected void initHandler(List<TableRuntime> tableRuntimeList) {
+    tableRuntimeList.stream()
+        .filter(t -> t instanceof SupportsProcessPlugins)
+        .map(t -> (SupportsProcessPlugins) t)
+        .forEach(tableRuntime -> tableRuntime.install(getAction(), 
processFactory));
     super.initHandler(tableRuntimeList);
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
-    return tableRuntime.enabled(getAction());
+  protected boolean enabled(TableRuntime tableRuntime) {
+    return Optional.of(tableRuntime)
+        .filter(t -> t instanceof SupportsProcessPlugins)
+        .map(t -> (SupportsProcessPlugins) t)
+        .map(t -> t.enabled(getAction()))
+        .orElse(false);
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
+    Preconditions.checkArgument(tableRuntime instanceof 
SupportsProcessPlugins);
+    SupportsProcessPlugins runtimeSupportProcessPlugin = 
(SupportsProcessPlugins) tableRuntime;
     // Trigger a table process and check conflicts by table runtime
     // Update process state after process completed, the callback must be 
register first
-    AmoroProcess<? extends TableProcessState> process = 
tableRuntime.trigger(getAction());
+    AmoroProcess<? extends TableProcessState> process =
+        runtimeSupportProcessPlugin.trigger(getAction());
     process.getCompleteFuture().whenCompleted(() -> 
persistTableProcess(process));
     ManagedProcess<? extends TableProcessState> managedProcess = new 
ManagedTableProcess<>(process);
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index c260cac53..f08b5110c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -22,9 +22,9 @@ import org.apache.amoro.Action;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.IcebergActions;
 import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.RuntimeHandlerChain;
 import org.apache.amoro.server.table.TableService;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -78,7 +78,7 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
   }
 
   @Override
-  protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+  protected void initHandler(List<TableRuntime> tableRuntimeList) {
     tableRuntimeList.stream()
         .filter(this::enabled)
         .forEach(
@@ -92,7 +92,7 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
     logger.info("Table executor {} initialized", getClass().getSimpleName());
   }
 
-  private void executeTask(DefaultTableRuntime tableRuntime) {
+  private void executeTask(TableRuntime tableRuntime) {
     try {
       if (isExecutable(tableRuntime)) {
         execute(tableRuntime);
@@ -103,8 +103,7 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
     }
   }
 
-  protected final void scheduleIfNecessary(
-      DefaultTableRuntime tableRuntime, long millisecondsTime) {
+  protected final void scheduleIfNecessary(TableRuntime tableRuntime, long 
millisecondsTime) {
     if (isExecutable(tableRuntime)) {
       if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
         executor.schedule(() -> executeTask(tableRuntime), millisecondsTime, 
TimeUnit.MILLISECONDS);
@@ -112,39 +111,37 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
     }
   }
 
-  protected abstract long getNextExecutingTime(DefaultTableRuntime 
tableRuntime);
+  protected abstract long getNextExecutingTime(TableRuntime tableRuntime);
 
-  protected abstract boolean enabled(DefaultTableRuntime tableRuntime);
+  protected abstract boolean enabled(TableRuntime tableRuntime);
 
-  protected abstract void execute(DefaultTableRuntime tableRuntime);
+  protected abstract void execute(TableRuntime tableRuntime);
 
   protected String getThreadName() {
     return String.join("-", 
StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
         .toLowerCase(Locale.ROOT);
   }
 
-  private boolean isExecutable(DefaultTableRuntime tableRuntime) {
+  private boolean isExecutable(TableRuntime tableRuntime) {
     return tableService.contains(tableRuntime.getTableIdentifier().getId())
         && enabled(tableRuntime);
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     // DO nothing by default
   }
 
   @Override
-  public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+  public void handleTableRemoved(TableRuntime tableRuntime) {
     // DO nothing, handling would be canceled when calling executeTable
   }
 
   @Override
-  public void handleStatusChanged(
-      DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {}
+  public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus 
originalStatus) {}
 
   @Override
-  public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime 
tableRuntime) {
+  public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) 
{
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
@@ -160,7 +157,7 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
     return START_DELAY + getExecutorDelay();
   }
 
-  protected AmoroTable<?> loadTable(DefaultTableRuntime tableRuntime) {
+  protected AmoroTable<?> loadTable(TableRuntime tableRuntime) {
     return tableService.loadTable(tableRuntime.getTableIdentifier());
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
index ec38e4d25..781081f6d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
@@ -18,10 +18,10 @@
 
 package org.apache.amoro.server.scheduler.inline;
 
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 
 public class BlockerExpiringExecutor extends PeriodicTableScheduler {
@@ -35,12 +35,12 @@ public class BlockerExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return INTERVAL;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return true;
   }
 
@@ -50,7 +50,7 @@ public class BlockerExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     try {
       persistency.doExpiring(tableRuntime);
     } catch (Throwable t) {
@@ -60,7 +60,7 @@ public class BlockerExpiringExecutor extends 
PeriodicTableScheduler {
 
   private static class Persistency extends PersistentBase {
 
-    public void doExpiring(DefaultTableRuntime tableRuntime) {
+    public void doExpiring(TableRuntime tableRuntime) {
       String catalog = tableRuntime.getTableIdentifier().getCatalog();
       String database = tableRuntime.getTableIdentifier().getDatabase();
       String table = tableRuntime.getTableIdentifier().getTableName();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index c19471535..16f80c9c0 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -19,8 +19,10 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
@@ -42,18 +44,18 @@ public class DanglingDeleteFilesCleaningExecutor extends 
PeriodicTableScheduler
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return INTERVAL;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
-    return 
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
+  protected boolean enabled(TableRuntime tableRuntime) {
+    return tableRuntime instanceof DefaultTableRuntime
+        && 
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
@@ -63,12 +65,12 @@ public class DanglingDeleteFilesCleaningExecutor extends 
PeriodicTableScheduler
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     try {
       LOG.info("{} start cleaning dangling delete files", 
tableRuntime.getTableIdentifier());
       AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
-      tableMaintainer.cleanDanglingDeleteFiles(tableRuntime);
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.cleanDanglingDeleteFiles();
     } catch (Throwable t) {
       LOG.error("{} failed to clean dangling delete file", 
tableRuntime.getTableIdentifier(), t);
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 2f31b1660..4990b7409 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -19,10 +19,11 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,18 +43,17 @@ public class DataExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return interval.toMillis();
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return 
tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
@@ -63,11 +63,11 @@ public class DataExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     try {
       AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
-      tableMaintainer.expireData(tableRuntime);
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.expireData();
     } catch (Throwable t) {
       LOG.error("unexpected expire error of table {} ", 
tableRuntime.getTableIdentifier(), t);
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
index d5251905c..6bc46a382 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
@@ -19,11 +19,11 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.hive.table.SupportHive;
 import org.apache.amoro.hive.utils.HiveMetaSynchronizer;
 import org.apache.amoro.hive.utils.TableTypeUtil;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.table.MixedTable;
 import org.slf4j.Logger;
@@ -42,12 +42,12 @@ public class HiveCommitSyncExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return INTERVAL;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return true;
   }
 
@@ -57,7 +57,7 @@ public class HiveCommitSyncExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     long startTime = System.currentTimeMillis();
     ServerTableIdentifier tableIdentifier = tableRuntime.getTableIdentifier();
     try {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
index 67c2e7670..056314273 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
@@ -18,7 +18,7 @@
 
 package org.apache.amoro.server.scheduler.inline;
 
-import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.DefaultTableRuntime;
@@ -35,13 +35,17 @@ public class OptimizingCommitExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return INTERVAL;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
-    return tableRuntime.getOptimizingState().getOptimizingStatus() == 
OptimizingStatus.COMMITTING;
+  protected boolean enabled(TableRuntime tableRuntime) {
+    return Optional.of(tableRuntime)
+        .filter(t -> t instanceof DefaultTableRuntime)
+        .map(t -> (DefaultTableRuntime) t)
+        .map(t -> t.getOptimizingState().getOptimizingStatus() == 
OptimizingStatus.COMMITTING)
+        .orElse(false);
   }
 
   @Override
@@ -50,8 +54,11 @@ public class OptimizingCommitExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
-    
Optional.ofNullable(tableRuntime.getOptimizingState().getOptimizingProcess())
+  protected void execute(TableRuntime tableRuntime) {
+    Optional.of(tableRuntime)
+        .filter(t -> t instanceof DefaultTableRuntime)
+        .map(t -> (DefaultTableRuntime) t)
+        .map(t -> t.getOptimizingState().getOptimizingProcess())
         .orElseThrow(
             () ->
                 new IllegalStateException(
@@ -60,14 +67,10 @@ public class OptimizingCommitExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  public void handleStatusChanged(
-      DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+  public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus 
originalStatus) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
-  @Override
-  public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime 
tableRuntime) {}
-
   protected long getStartDelay() {
     return 0;
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
index ad8e71b83..77f610b7c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
@@ -18,10 +18,10 @@
 
 package org.apache.amoro.server.scheduler.inline;
 
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 import org.slf4j.Logger;
@@ -41,12 +41,12 @@ public class OptimizingExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return interval;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return true;
   }
 
@@ -56,7 +56,7 @@ public class OptimizingExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     try {
       persistency.doExpiring(tableRuntime);
     } catch (Throwable throwable) {
@@ -66,7 +66,7 @@ public class OptimizingExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   private class Persistency extends PersistentBase {
-    public void doExpiring(DefaultTableRuntime tableRuntime) {
+    public void doExpiring(TableRuntime tableRuntime) {
       long expireTime = System.currentTimeMillis() - keepTime;
       long minProcessId = SnowflakeIdGenerator.getMinSnowflakeId(expireTime);
       doAsTransaction(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index e7512e4b5..332c51141 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -19,10 +19,11 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,18 +41,17 @@ public class OrphanFilesCleaningExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return interval.toMillis();
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
@@ -61,12 +61,12 @@ public class OrphanFilesCleaningExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  public void execute(DefaultTableRuntime tableRuntime) {
+  public void execute(TableRuntime tableRuntime) {
     try {
       LOG.info("{} start cleaning orphan files", 
tableRuntime.getTableIdentifier());
       AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
-      tableMaintainer.cleanOrphanFiles(tableRuntime);
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.cleanOrphanFiles();
     } catch (Throwable t) {
       LOG.error("{} failed to clean orphan file", 
tableRuntime.getTableIdentifier(), t);
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index d465023bd..f7d0cb927 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -19,10 +19,11 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,18 +41,17 @@ public class SnapshotsExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return INTERVAL;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return tableRuntime.getTableConfiguration().isExpireSnapshotEnabled();
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
@@ -61,11 +61,11 @@ public class SnapshotsExpiringExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  public void execute(DefaultTableRuntime tableRuntime) {
+  public void execute(TableRuntime tableRuntime) {
     try {
       AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
-      tableMaintainer.expireSnapshots(tableRuntime);
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.expireSnapshots();
     } catch (Throwable t) {
       LOG.error("unexpected expire error of table {} ", 
tableRuntime.getTableIdentifier(), t);
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index d9557aba0..195f14213 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
 import org.apache.amoro.process.ProcessStatus;
@@ -28,6 +29,7 @@ import org.apache.amoro.server.table.DefaultOptimizingState;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.IcebergTableUtil;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 
 /** Executor that refreshes table runtimes and evaluates optimizing status 
periodically. */
@@ -45,13 +47,17 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
-    return true;
+  protected boolean enabled(TableRuntime tableRuntime) {
+    return tableRuntime instanceof DefaultTableRuntime;
   }
 
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  @Override
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
+    DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) 
tableRuntime;
     return Math.min(
-        
tableRuntime.getOptimizingState().getOptimizingConfig().getMinorLeastInterval() 
* 4L / 5,
+        
defaultTableRuntime.getOptimizingState().getOptimizingConfig().getMinorLeastInterval()
+            * 4L
+            / 5,
         interval);
   }
 
@@ -77,13 +83,14 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
+    Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+    DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) 
tableRuntime;
     // After disabling self-optimizing, close the currently running optimizing 
process.
     if (originalConfig.getOptimizingConfig().isEnabled()
         && 
!tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
       OptimizingProcess optimizingProcess =
-          tableRuntime.getOptimizingState().getOptimizingProcess();
+          defaultTableRuntime.getOptimizingState().getOptimizingProcess();
       if (optimizingProcess != null && optimizingProcess.getStatus() == 
ProcessStatus.RUNNING) {
         optimizingProcess.close();
       }
@@ -96,9 +103,12 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  public void execute(DefaultTableRuntime tableRuntime) {
+  public void execute(TableRuntime tableRuntime) {
     try {
-      DefaultOptimizingState optimizingState = 
tableRuntime.getOptimizingState();
+      Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+      DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) 
tableRuntime;
+
+      DefaultOptimizingState optimizingState = 
defaultTableRuntime.getOptimizingState();
       long lastOptimizedSnapshotId = 
optimizingState.getLastOptimizedSnapshotId();
       long lastOptimizedChangeSnapshotId = 
optimizingState.getLastOptimizedChangeSnapshotId();
       AmoroTable<?> table = loadTable(tableRuntime);
@@ -109,7 +119,7 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
                   || lastOptimizedChangeSnapshotId != 
optimizingState.getCurrentChangeSnapshotId()))
           || (mixedTable.isUnkeyedTable()
               && lastOptimizedSnapshotId != 
optimizingState.getCurrentSnapshotId())) {
-        tryEvaluatingPendingInput(tableRuntime, mixedTable);
+        tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
       }
     } catch (Throwable throwable) {
       logger.error("Refreshing table {} failed.", 
tableRuntime.getTableIdentifier(), throwable);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
index 99a06fb38..5207f105a 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
@@ -20,10 +20,11 @@ package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,12 +43,12 @@ public class TagsAutoCreatingExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
     return interval;
   }
 
   @Override
-  protected boolean enabled(DefaultTableRuntime tableRuntime) {
+  protected boolean enabled(TableRuntime tableRuntime) {
     return 
tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag()
         && tableRuntime.getFormat() == TableFormat.ICEBERG;
   }
@@ -58,19 +59,18 @@ public class TagsAutoCreatingExecutor extends 
PeriodicTableScheduler {
   }
 
   @Override
-  protected void execute(DefaultTableRuntime tableRuntime) {
+  protected void execute(TableRuntime tableRuntime) {
     try {
       AmoroTable<?> amoroTable = loadTable(tableRuntime);
-      TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
-      tableMaintainer.autoCreateTags(tableRuntime);
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.autoCreateTags();
     } catch (Throwable t) {
       LOG.error("Failed to create tags on {}", 
tableRuntime.getTableIdentifier(), t);
     }
   }
 
   @Override
-  public void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+  public void handleConfigChanged(TableRuntime tableRuntime, 
TableConfiguration originalConfig) {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 5ab6b0207..c06665df6 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -87,6 +87,7 @@ public class DefaultTableRuntime extends StatedPersistentBase
     optimizingState.registerMetric(metricRegistry);
   }
 
+  @Override
   public void dispose() {
     optimizingState.dispose();
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 2c6d3d014..bb44b785a 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -22,10 +22,10 @@ import org.apache.amoro.AmoroTable;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableIDWithFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.api.CatalogMeta;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.exception.ObjectNotExistsException;
 import org.apache.amoro.server.AmoroManagementConf;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.catalog.ExternalCatalog;
@@ -39,7 +39,6 @@ import 
org.apache.amoro.server.persistence.mapper.TableMetaMapper;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -70,7 +69,7 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   public static final Logger LOG = 
LoggerFactory.getLogger(DefaultTableService.class);
   private final long externalCatalogRefreshingInterval;
 
-  private final Map<Long, DefaultTableRuntime> tableRuntimeMap = new 
ConcurrentHashMap<>();
+  private final Map<Long, TableRuntime> tableRuntimeMap = new 
ConcurrentHashMap<>();
 
   private final ScheduledExecutorService tableExplorerScheduler =
       Executors.newSingleThreadScheduledExecutor(
@@ -149,7 +148,7 @@ public class DefaultTableService extends PersistentBase 
implements TableService
 
     List<TableRuntimeMeta> tableRuntimeMetaList =
         getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas);
-    List<DefaultTableRuntime> tableRuntimes = new 
ArrayList<>(tableRuntimeMetaList.size());
+    List<TableRuntime> tableRuntimes = new 
ArrayList<>(tableRuntimeMetaList.size());
     tableRuntimeMetaList.forEach(
         tableRuntimeMeta -> {
           DefaultTableRuntime tableRuntime = new 
DefaultTableRuntime(tableRuntimeMeta, this);
@@ -184,17 +183,8 @@ public class DefaultTableService extends PersistentBase 
implements TableService
     initialized.complete(true);
   }
 
-  private DefaultTableRuntime getAndCheckExist(ServerTableIdentifier 
tableIdentifier) {
-    Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier 
cannot be null");
-    DefaultTableRuntime tableRuntime = getRuntime(tableIdentifier.getId());
-    if (tableRuntime == null) {
-      throw new ObjectNotExistsException(tableIdentifier);
-    }
-    return tableRuntime;
-  }
-
   @Override
-  public DefaultTableRuntime getRuntime(Long tableId) {
+  public TableRuntime getRuntime(Long tableId) {
     checkStarted();
     return tableRuntimeMap.get(tableId);
   }
@@ -261,7 +251,7 @@ public class DefaultTableService extends PersistentBase 
implements TableService
         catalogManager.listCatalogMetas().stream()
             .map(CatalogMeta::getCatalogName)
             .collect(Collectors.toSet());
-    for (DefaultTableRuntime tableRuntime : tableRuntimeMap.values()) {
+    for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
       if 
(!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
         disposeTable(tableRuntime.getTableIdentifier());
       }
@@ -476,7 +466,7 @@ public class DefaultTableService extends PersistentBase 
implements TableService
 
   @VisibleForTesting
   public void disposeTable(ServerTableIdentifier tableIdentifier) {
-    DefaultTableRuntime existedTableRuntime = 
tableRuntimeMap.get(tableIdentifier.getId());
+    TableRuntime existedTableRuntime = 
tableRuntimeMap.get(tableIdentifier.getId());
     try {
       doAsTransaction(
           () ->
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
index 0ab56c4a2..148d03d76 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.table;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -51,8 +52,8 @@ public abstract class RuntimeHandlerChain {
     }
   }
 
-  public final void initialize(List<DefaultTableRuntime> tableRuntimes) {
-    List<DefaultTableRuntime> supportedtableRuntimeList =
+  public final void initialize(List<TableRuntime> tableRuntimes) {
+    List<TableRuntime> supportedtableRuntimeList =
         tableRuntimes.stream()
             .filter(runtime -> formatSupported(runtime.getFormat()))
             .collect(Collectors.toList());
@@ -63,8 +64,7 @@ public abstract class RuntimeHandlerChain {
     }
   }
 
-  public final void fireStatusChanged(
-      DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+  public final void fireStatusChanged(TableRuntime tableRuntime, 
OptimizingStatus originalStatus) {
     if (!initialized) {
       return;
     }
@@ -77,7 +77,7 @@ public abstract class RuntimeHandlerChain {
   }
 
   public final void fireConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+      TableRuntime tableRuntime, TableConfiguration originalConfig) {
     if (!initialized) {
       return;
     }
@@ -90,7 +90,7 @@ public abstract class RuntimeHandlerChain {
     }
   }
 
-  public final void fireTableAdded(AmoroTable<?> table, DefaultTableRuntime 
tableRuntime) {
+  public final void fireTableAdded(AmoroTable<?> table, TableRuntime 
tableRuntime) {
     if (!initialized) {
       return;
     }
@@ -103,7 +103,7 @@ public abstract class RuntimeHandlerChain {
     }
   }
 
-  public final void fireTableRemoved(DefaultTableRuntime tableRuntime) {
+  public final void fireTableRemoved(TableRuntime tableRuntime) {
     if (!initialized) {
       return;
     }
@@ -138,16 +138,16 @@ public abstract class RuntimeHandlerChain {
   }
 
   protected abstract void handleStatusChanged(
-      DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus);
+      TableRuntime tableRuntime, OptimizingStatus originalStatus);
 
   protected abstract void handleConfigChanged(
-      DefaultTableRuntime tableRuntime, TableConfiguration originalConfig);
+      TableRuntime tableRuntime, TableConfiguration originalConfig);
 
-  protected abstract void handleTableAdded(AmoroTable<?> table, 
DefaultTableRuntime tableRuntime);
+  protected abstract void handleTableAdded(AmoroTable<?> table, TableRuntime 
tableRuntime);
 
-  protected abstract void handleTableRemoved(DefaultTableRuntime tableRuntime);
+  protected abstract void handleTableRemoved(TableRuntime tableRuntime);
 
-  protected abstract void initHandler(List<DefaultTableRuntime> 
tableRuntimeList);
+  protected abstract void initHandler(List<TableRuntime> tableRuntimeList);
 
   protected abstract void doDispose();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
index 68c1ff8f7..ae711639f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.table;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.catalog.InternalCatalog;
 
 public interface TableService extends TableRuntimeHandler {
@@ -32,7 +33,7 @@ public interface TableService extends TableRuntimeHandler {
 
   void onTableDropped(InternalCatalog catalog, ServerTableIdentifier 
identifier);
 
-  DefaultTableRuntime getRuntime(Long tableId);
+  TableRuntime getRuntime(Long tableId);
 
   default boolean contains(Long tableId) {
     return getRuntime(tableId) != null;
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 175772cb2..15f7009b0 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -22,6 +22,7 @@ import org.apache.amoro.config.Configurations;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.manager.EventsManager;
 import org.apache.amoro.server.manager.MetricManager;
+import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.DefaultTableService;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -68,6 +69,10 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
     return TABLE_SERVICE;
   }
 
+  protected DefaultTableRuntime getDefaultTableRuntime(long tableId) {
+    return (DefaultTableRuntime) tableService().getRuntime(tableId);
+  }
+
   protected DefaultOptimizingService optimizingService() {
     return OPTIMIZING_SERVICE;
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
index 5e324a47c..5a3c4aa82 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
@@ -21,11 +21,11 @@ package org.apache.amoro.server;
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.api.CatalogMeta;
 import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.catalog.InternalCatalog;
-import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableMetadata;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -118,13 +118,13 @@ public abstract class RestCatalogServiceTestBase {
     return internalCatalog.loadTableMetadata(identifier.getDatabase(), 
identifier.getTableName());
   }
 
-  protected DefaultTableRuntime getTableRuntime(TableIdentifier identifier) {
+  protected TableRuntime getTableRuntime(TableIdentifier identifier) {
     ServerTableIdentifier serverTableIdentifier = 
getServerTableIdentifier(identifier);
     return tableService.getRuntime(serverTableIdentifier.getId());
   }
 
   protected void assertTableRuntime(TableIdentifier identifier, TableFormat 
format) {
-    DefaultTableRuntime runtime = getTableRuntime(identifier);
+    TableRuntime runtime = getTableRuntime(identifier);
     Assertions.assertNotNull(runtime, "table runtime is not exists after 
created");
     Assertions.assertEquals(format, runtime.getFormat());
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 9c6051d44..2e1d0b8f4 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -128,7 +128,7 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
         (MixedTable) 
tableService().loadTable(serverTableIdentifier()).originalTable();
     appendData(mixedTable.asUnkeyedTable(), 1);
     appendData(mixedTable.asUnkeyedTable(), 2);
-    DefaultTableRuntime runtime = 
tableService().getRuntime(serverTableIdentifier().getId());
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
 
     
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
   }
@@ -384,14 +384,12 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     reload();
     // Committing process will be closed when reloading
     Assertions.assertNull(
-        tableService()
-            .getRuntime(serverTableIdentifier().getId())
+        getDefaultTableRuntime(serverTableIdentifier().getId())
             .getOptimizingState()
             .getOptimizingProcess());
     Assertions.assertEquals(
         OptimizingStatus.IDLE,
-        tableService()
-            .getRuntime(serverTableIdentifier().getId())
+        getDefaultTableRuntime(serverTableIdentifier().getId())
             .getOptimizingState()
             .getOptimizingStatus());
   }
@@ -748,15 +746,13 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     }
     Assertions.assertEquals(
         ProcessStatus.RUNNING,
-        tableService()
-            .getRuntime(serverTableIdentifier().getId())
+        getDefaultTableRuntime(serverTableIdentifier().getId())
             .getOptimizingState()
             .getOptimizingProcess()
             .getStatus());
     Assertions.assertEquals(
         OptimizingStatus.COMMITTING,
-        tableService()
-            .getRuntime(serverTableIdentifier().getId())
+        getDefaultTableRuntime(serverTableIdentifier().getId())
             .getOptimizingState()
             .getOptimizingStatus());
   }
@@ -783,7 +779,7 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     }
 
     void refreshPending() {
-      execute(tableService().getRuntime(serverTableIdentifier().getId()));
+      execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
     }
   }
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
index 1cf06a5d6..2bb77983a 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
@@ -91,7 +91,7 @@ public class TestOverviewManager extends AMSTableTestBase {
             .asUnkeyedTable();
     appendData(table, 1);
     appendData(table, 2);
-    DefaultTableRuntime runtime = 
tableService().getRuntime(serverTableIdentifier().getId());
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
     
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
   }
 
@@ -109,7 +109,7 @@ public class TestOverviewManager extends AMSTableTestBase {
   void refreshPending() {
     TableRuntimeRefreshExecutor refresher =
         new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
-    
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
+    refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
     refresher.dispose();
   }
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index ebec3c8f9..9e10d94c3 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -242,7 +242,7 @@ public class TestDataExpire extends ExecutorTestBase {
 
     // expire partitions that order than 2022-01-02 18:00:00.000
     DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(keyedTable);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(keyedTable, null);
     tableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -325,7 +325,7 @@ public class TestDataExpire extends ExecutorTestBase {
 
     // expire partitions that order than 2022-01-02 18:00:00.000
     DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     mixedTableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -394,14 +394,15 @@ public class TestDataExpire extends ExecutorTestBase {
 
     if (getTestFormat().equals(TableFormat.ICEBERG)) {
       Table table = getMixedTable().asUnkeyedTable();
-      IcebergTableMaintainer icebergTableMaintainer = new 
IcebergTableMaintainer(table);
+      IcebergTableMaintainer icebergTableMaintainer =
+          new IcebergTableMaintainer(table, getMixedTable().id(), null);
       Types.NestedField field = 
table.schema().findField(config.getExpirationField());
       long lastSnapshotTime = table.currentSnapshot().timestampMillis();
       long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config, 
field).toEpochMilli();
       Assert.assertEquals(lastSnapshotTime, lastCommitTime);
     } else {
       MixedTable mixedTable = getMixedTable();
-      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(mixedTable);
+      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(mixedTable, null);
       Types.NestedField field = 
getMixedTable().schema().findField(config.getExpirationField());
 
       long lastSnapshotTime;
@@ -427,7 +428,8 @@ public class TestDataExpire extends ExecutorTestBase {
   protected void getMaintainerAndExpire(DataExpirationConfig config, String 
datetime) {
     if (getTestFormat().equals(TableFormat.ICEBERG)) {
       Table table = getMixedTable().asUnkeyedTable();
-      IcebergTableMaintainer icebergTableMaintainer = new 
IcebergTableMaintainer(table);
+      IcebergTableMaintainer icebergTableMaintainer =
+          new IcebergTableMaintainer(table, getMixedTable().id(), null);
       Types.NestedField field = 
table.schema().findField(config.getExpirationField());
       icebergTableMaintainer.expireDataFrom(
           config,
@@ -439,7 +441,7 @@ public class TestDataExpire extends ExecutorTestBase {
                           
getMixedTable().schema().findField(config.getExpirationField())))
                   .toInstant());
     } else {
-      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable());
+      MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
       Types.NestedField field = 
getMixedTable().schema().findField(config.getExpirationField());
       mixedTableMaintainer.expireDataFrom(
           config,
@@ -501,7 +503,7 @@ public class TestDataExpire extends ExecutorTestBase {
     assertScanResult(scan, 1, 0);
 
     DataExpirationConfig config = parseDataExpirationConfig(testTable);
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     mixedTableMaintainer.expireDataFrom(
         config,
         LocalDateTime.parse("2024-01-01T00:00:00.000")
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
index 5df5b47b1..df472d4fd 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
@@ -120,7 +120,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
                 tableIdentifier.getDatabase(),
                 tableIdentifier.getTableName(),
                 getTestFormat()));
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     maintainer.cleanContentFiles(
         System.currentTimeMillis()
             - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 
1000,
@@ -204,7 +204,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
       
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
     }
 
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
@@ -297,7 +297,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
       
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
     }
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
@@ -346,9 +346,9 @@ public class TestOrphanFileClean extends ExecutorTestBase {
     Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
-    new MixedTableMaintainer(getMixedTable())
+    new MixedTableMaintainer(getMixedTable(), null)
         .cleanContentFiles(System.currentTimeMillis() + 1, 
orphanFilesCleaningMetrics);
-    new MixedTableMaintainer(getMixedTable())
+    new MixedTableMaintainer(getMixedTable(), null)
         .cleanMetadata(System.currentTimeMillis() + 1, 
orphanFilesCleaningMetrics);
     Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
     Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
@@ -411,8 +411,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
             new TableOrphanFilesCleaningMetrics(
                 ServerTableIdentifier.of(baseTable.id(), getTestFormat())));
 
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable());
-    maintainer.cleanOrphanFiles(tableRuntime);
+    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    maintainer.cleanOrphanFiles();
 
     Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir));
     Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath));
@@ -420,7 +420,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
     baseTable.updateProperties().set("gc.enabled", "true").commit();
     Mockito.when(tableRuntime.getTableConfiguration())
         
.thenReturn(TableConfigurations.parseTableConfig((baseTable.properties())));
-    maintainer.cleanOrphanFiles(tableRuntime);
+    maintainer.cleanOrphanFiles();
 
     Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir));
     Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath));
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
index 8a818a4d5..e55f33ee8 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
@@ -84,7 +84,7 @@ public class TestOrphanFileCleanHive extends 
TestOrphanFileClean {
     changeOrphanDataFile.createOrOverwrite().close();
     Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath));
 
-    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer maintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     TableIdentifier tableIdentifier = getMixedTable().id();
     TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
index baca500dc..60bbd4853 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
@@ -100,8 +100,9 @@ public class TestOrphanFileCleanIceberg extends 
TestOrphanFileClean {
         .commit();
     assertDanglingDeleteFiles(testTable, 1);
 
-    IcebergTableMaintainer tableMaintainer = new 
IcebergTableMaintainer(testTable);
-    tableMaintainer.cleanDanglingDeleteFiles();
+    IcebergTableMaintainer tableMaintainer =
+        new IcebergTableMaintainer(testTable, testTable.id(), null);
+    tableMaintainer.doCleanDanglingDeleteFiles();
 
     assertDanglingDeleteFiles(testTable, 0);
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index d5326dfab..4d1c0a6d7 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -116,7 +116,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
         file ->
             
Assert.assertTrue(testKeyedTable.changeTable().io().exists(file.path().toString())));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, null);
     tableMaintainer.getChangeMaintainer().expireFiles(l + 1);
 
     // In order to advance the snapshot
@@ -180,8 +180,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     Assert.assertEquals(5, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable);
-    tableMaintainer.expireSnapshots(tableRuntime);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, tableRuntime);
+    tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
     List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -225,8 +225,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     Assert.assertEquals(4, Iterables.size(table.snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable());
-    tableMaintainer.expireSnapshots(tableRuntime);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
     List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -268,8 +268,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     Assert.assertEquals(4, Iterables.size(table.snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable());
-    tableMaintainer.expireSnapshots(tableRuntime);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(getMixedTable(), tableRuntime);
+    tableMaintainer.expireSnapshots();
 
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
     List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -299,7 +299,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Mockito.when(tableRuntime.getTableConfiguration())
         .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
 
-    new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
     Assert.assertEquals(1, Iterables.size(table.snapshots()));
 
     table.newAppend().commit();
@@ -320,7 +320,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.newAppend().commit();
     expectedSnapshots.add(table.currentSnapshot());
 
-    new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
     Assert.assertTrue(
         Iterators.elementsEqual(expectedSnapshots.iterator(), 
table.snapshots().iterator()));
@@ -344,7 +344,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
     List<DataFile> newDataFiles = writeAndCommitBaseStore(table);
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
-    new 
MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis(), 1);
+    new MixedTableMaintainer(table, 
null).expireSnapshots(System.currentTimeMillis(), 1);
     Assert.assertEquals(1, Iterables.size(table.snapshots()));
 
     dataFiles.forEach(file -> 
Assert.assertFalse(table.io().exists(file.path().toString())));
@@ -392,7 +392,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Assert.assertEquals(12, 
Iterables.size(testKeyedTable.changeTable().newScan().planFiles()));
     Assert.assertEquals(3, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, null);
     tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1);
     tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 
1, 1);
 
@@ -460,7 +460,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Assert.assertTrue(baseTable.io().exists(file1.path()));
     Assert.assertTrue(baseTable.io().exists(file2.path()));
     Assert.assertTrue(baseTable.io().exists(file3.path()));
-    new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime, 1);
+    new MixedTableMaintainer(testKeyedTable, null).expireSnapshots(expireTime, 
1);
 
     Assert.assertEquals(1, Iterables.size(baseTable.snapshots()));
     Assert.assertFalse(baseTable.io().exists(file1.path()));
@@ -491,15 +491,16 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Mockito.when(optimizingState.getTableConfiguration())
         
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
 
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testKeyedTable, tableRuntime);
     testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, 
"0").commit();
-    tableMaintainer.expireSnapshots(tableRuntime);
+    tableMaintainer.expireSnapshots();
     Assert.assertEquals(2, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
 
     testKeyedTable.updateProperties().set("gc.enabled", "true").commit();
     Mockito.when(tableRuntime.getTableConfiguration())
         
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
-    tableMaintainer.expireSnapshots(tableRuntime);
+    tableMaintainer = new MixedTableMaintainer(testKeyedTable, tableRuntime);
+    tableMaintainer.expireSnapshots();
     Assert.assertEquals(1, 
Iterables.size(testKeyedTable.changeTable().snapshots()));
   }
 
@@ -521,21 +522,22 @@ public class TestSnapshotExpire extends ExecutorTestBase {
         .thenReturn(ServerTableIdentifier.of(testUnkeyedTable.id(), 
getTestFormat()));
     Mockito.when(tableRuntime.getOptimizingState().getOptimizingStatus())
         .thenReturn(OptimizingStatus.IDLE);
-    Mockito.when(tableRuntime.getTableConfiguration())
-        
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
-
-    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testUnkeyedTable);
     testUnkeyedTable
         .updateProperties()
         .set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0")
         .commit();
-    tableMaintainer.expireSnapshots(tableRuntime);
+    Mockito.when(tableRuntime.getTableConfiguration())
+        
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
+
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+    tableMaintainer.expireSnapshots();
     Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
 
     testUnkeyedTable.updateProperties().set("gc.enabled", "true").commit();
     Mockito.when(tableRuntime.getTableConfiguration())
         
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
-    tableMaintainer.expireSnapshots(tableRuntime);
+    tableMaintainer = new MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+    tableMaintainer.expireSnapshots();
     Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots()));
   }
 
@@ -563,7 +565,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     Mockito.when(tableRuntime.getOptimizingState().getOptimizingStatus())
         .thenReturn(OptimizingStatus.IDLE);
 
-    new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
     Assert.assertEquals(2, Iterables.size(table.snapshots()));
 
     table.newAppend().commit();
@@ -571,7 +573,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
     table.newAppend().commit();
     expectedSnapshots.add(table.currentSnapshot());
 
-    new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+    new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
     Assert.assertTrue(
         Iterators.elementsEqual(expectedSnapshots.iterator(), 
table.snapshots().iterator()));
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
index dd9f198c8..ef1609d5b 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
@@ -114,7 +114,7 @@ public class TestSnapshotExpireHive extends 
TestSnapshotExpire {
         isKeyedTable()
             ? getMixedTable().asKeyedTable().baseTable()
             : getMixedTable().asUnkeyedTable();
-    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable());
+    MixedTableMaintainer mixedTableMaintainer = new 
MixedTableMaintainer(getMixedTable(), null);
     
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(),
 1);
     Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
index 67c666bd7..db8162494 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.table;
 import org.apache.amoro.CommonUnifiedCatalog;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.UnifiedCatalog;
 import org.apache.amoro.api.CatalogMeta;
@@ -300,7 +301,7 @@ public class AMSTableTestBase extends AMSServiceTestBase {
         tableTestHelper().primaryKeySpec().primaryKeyExisted(), 
mixedTable.isKeyedTable());
   }
 
-  protected void validateTableRuntime(DefaultTableRuntime tableRuntime) {
+  protected void validateTableRuntime(TableRuntime tableRuntime) {
     Assert.assertEquals(serverTableIdentifier(), 
tableRuntime.getTableIdentifier());
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
index 0a0dd445b..85d55baaf 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
@@ -22,6 +22,7 @@ import org.apache.amoro.AmoroTable;
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
@@ -99,8 +100,7 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
     MixedTable mixedTable = (MixedTable) 
tableService().loadTable(createTableId).originalTable();
 
     mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, 
"true").commit();
-    tableService()
-        .getRuntime(createTableId.getId())
+    getDefaultTableRuntime(createTableId.getId())
         .getOptimizingState()
         .refresh(tableService.loadTable(serverTableIdentifier()));
     Assert.assertEquals(1, handler.getConfigChangedTables().size());
@@ -133,39 +133,38 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
 
   static class TestHandler extends RuntimeHandlerChain {
 
-    private final List<DefaultTableRuntime> initTables = Lists.newArrayList();
-    private final List<Pair<DefaultTableRuntime, OptimizingStatus>> 
statusChangedTables =
+    private final List<TableRuntime> initTables = Lists.newArrayList();
+    private final List<Pair<TableRuntime, OptimizingStatus>> 
statusChangedTables =
         Lists.newArrayList();
-    private final List<Pair<DefaultTableRuntime, TableConfiguration>> 
configChangedTables =
+    private final List<Pair<TableRuntime, TableConfiguration>> 
configChangedTables =
         Lists.newArrayList();
-    private final List<Pair<MixedTable, DefaultTableRuntime>> addedTables = 
Lists.newArrayList();
-    private final List<DefaultTableRuntime> removedTables = 
Lists.newArrayList();
+    private final List<Pair<MixedTable, TableRuntime>> addedTables = 
Lists.newArrayList();
+    private final List<TableRuntime> removedTables = Lists.newArrayList();
     private boolean disposed = false;
 
     @Override
-    protected void handleStatusChanged(
-        DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+    protected void handleStatusChanged(TableRuntime tableRuntime, 
OptimizingStatus originalStatus) {
       statusChangedTables.add(Pair.of(tableRuntime, originalStatus));
     }
 
     @Override
     protected void handleConfigChanged(
-        DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+        TableRuntime tableRuntime, TableConfiguration originalConfig) {
       configChangedTables.add(Pair.of(tableRuntime, originalConfig));
     }
 
     @Override
-    protected void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime 
tableRuntime) {
+    protected void handleTableAdded(AmoroTable<?> table, TableRuntime 
tableRuntime) {
       addedTables.add(Pair.of((MixedTable) table.originalTable(), 
tableRuntime));
     }
 
     @Override
-    protected void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+    protected void handleTableRemoved(TableRuntime tableRuntime) {
       removedTables.add(tableRuntime);
     }
 
     @Override
-    protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+    protected void initHandler(List<TableRuntime> tableRuntimeList) {
       initTables.addAll(tableRuntimeList);
     }
 
@@ -174,23 +173,19 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
       disposed = true;
     }
 
-    public List<DefaultTableRuntime> getInitTables() {
+    public List<TableRuntime> getInitTables() {
       return initTables;
     }
 
-    public List<Pair<DefaultTableRuntime, OptimizingStatus>> 
getStatusChangedTables() {
-      return statusChangedTables;
-    }
-
-    public List<Pair<DefaultTableRuntime, TableConfiguration>> 
getConfigChangedTables() {
+    public List<Pair<TableRuntime, TableConfiguration>> 
getConfigChangedTables() {
       return configChangedTables;
     }
 
-    public List<Pair<MixedTable, DefaultTableRuntime>> getAddedTables() {
+    public List<Pair<MixedTable, TableRuntime>> getAddedTables() {
       return addedTables;
     }
 
-    public List<DefaultTableRuntime> getRemovedTables() {
+    public List<TableRuntime> getRemovedTables() {
       return removedTables;
     }
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
index 51e8829c7..17f0e6349 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
@@ -71,7 +71,7 @@ public class TestDefaultTableRuntimeManager extends 
AMSTableTestBase {
 
   @Test
   public void testTableRuntime() {
-    DefaultTableRuntime tableRuntime = 
tableService().getRuntime(serverTableIdentifier().getId());
+    DefaultTableRuntime tableRuntime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
     validateTableRuntime(tableRuntime);
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
index c5dc72f2e..7985ce844 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
@@ -513,7 +513,7 @@ public class TestSyncTableOfExternalCatalog extends 
AMSTableTestBase {
     Assert.assertEquals(1, 
tableRuntimeMetaListForOptimizerGroupAfterAddTable.size());
     Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
 
-    DefaultTableRuntime tableRuntime = 
tableService().getRuntime(tableIdentifier.getId());
+    DefaultTableRuntime tableRuntime = 
getDefaultTableRuntime(tableIdentifier.getId());
     // create a tableIdentifier that throw exceptions when calling the 
getTableName() method, which
     // can lead to exceptions when deleting tableIdentifier in disposeTable().
     try (AutoCloseable ignored = MockitoAnnotations.openMocks(this)) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
index 7332abd39..e792db0d7 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
@@ -346,8 +346,7 @@ public class TestTableManager extends AMSTableTestBase {
   }
 
   private boolean isTableRuntimeBlocked(BlockableOperation operation) {
-    return tableService()
-        .getRuntime(serverTableIdentifier().getId())
+    return getDefaultTableRuntime(serverTableIdentifier().getId())
         .getOptimizingState()
         .isBlocked(operation);
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
index 61454fff3..588f7f83c 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
@@ -105,7 +105,7 @@ public class TestTableSummaryMetrics extends 
AMSTableTestBase {
             .asUnkeyedTable();
     appendData(table);
     appendPosDelete(table);
-    DefaultTableRuntime runtime = 
tableService().getRuntime(serverTableIdentifier().getId());
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
     
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
   }
 
@@ -143,7 +143,7 @@ public class TestTableSummaryMetrics extends 
AMSTableTestBase {
   void refreshPending() {
     TableRuntimeRefreshExecutor refresher =
         new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
-    
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
+    refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
     refresher.dispose();
   }
 
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java 
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 6509843ef..9ee7cf184 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -65,4 +65,7 @@ public interface TableRuntime {
   default TableFormat getFormat() {
     return getTableIdentifier().getFormat();
   }
+
+  /** Dispose the table runtime. */
+  default void dispose() {}
 }

Reply via email to