This is an automated email from the ASF dual-hosted git repository.

baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new f6bad83ba [Improvement] Refactor table_optimizing_process into a more 
generic structure (#3715)
f6bad83ba is described below

commit f6bad83bab802849730f6e0035179a4c8b8fee88
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 12 16:24:30 2025 +0800

    [Improvement] Refactor table_optimizing_process into a more generic 
structure (#3715)
    
    * [Improvement] Refactor table_optimizing_process into a more generic 
structure
    
    * Fix COMMENTS
    
    ---------
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
---
 .../amoro/server/DefaultOptimizingService.java     |   8 +-
 .../dashboard/MixedAndIcebergTableDescriptor.java  |  81 +++++-----
 .../server/optimizing/OptimizingProcessMeta.java   | 167 ---------------------
 .../amoro/server/optimizing/OptimizingQueue.java   |  45 +++---
 .../amoro/server/optimizing/TaskRuntime.java       |   4 +-
 .../persistence/SqlSessionFactoryProvider.java     |   6 +-
 .../server/persistence/TaskFilesPersistence.java   |   8 +-
 ...ingMapper.java => OptimizingProcessMapper.java} | 124 +++------------
 .../server/persistence/mapper/TableMetaMapper.java |  27 ++--
 .../persistence/mapper/TableProcessMapper.java     | 116 ++++++++++++++
 .../amoro/server/process/TableProcessMeta.java     | 116 ++++++++++++++
 .../inline/OptimizingExpiringExecutor.java         |  16 +-
 .../amoro/server/table/DefaultOptimizingState.java |   6 +-
 .../amoro/server/table/DefaultTableManager.java    |   8 +-
 .../src/main/resources/derby/ams-derby-init.sql    |  43 +++---
 .../src/main/resources/mysql/ams-mysql-init.sql    |  29 ++--
 amoro-ams/src/main/resources/mysql/upgrade.sql     |  54 ++++++-
 .../main/resources/postgres/ams-postgres-init.sql  |  83 +++++-----
 amoro-ams/src/main/resources/postgres/upgrade.sql  |  91 +++++++++++
 .../TestIcebergServerTableDescriptor.java          |  25 ++-
 .../server/optimizing/BaseOptimizingChecker.java   |  82 +++++-----
 .../optimizing/TestIcebergHadoopOptimizing.java    |  11 +-
 .../server/optimizing/TestMixedHiveOptimizing.java |   5 +-
 .../optimizing/TestMixedIcebergOptimizing.java     |  11 +-
 .../apache/amoro/optimizing/MetricsSummary.java    |  37 +++++
 25 files changed, 718 insertions(+), 485 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 397c473eb..099021cde 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -36,14 +36,14 @@ import org.apache.amoro.exception.TaskNotFoundException;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
 import org.apache.amoro.server.optimizing.OptimizingQueue;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.StatedPersistentBase;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
 import org.apache.amoro.server.persistence.mapper.ResourceMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.resource.OptimizerManager;
 import org.apache.amoro.server.resource.OptimizerThread;
@@ -288,8 +288,8 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
 
   @Override
   public boolean cancelProcess(long processId) throws TException {
-    OptimizingProcessMeta processMeta =
-        getAs(OptimizingMapper.class, m -> m.getOptimizingProcess(processId));
+    TableProcessMeta processMeta =
+        getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
     if (processMeta == null) {
       return false;
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index c5c5c23a0..df035ee0b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -22,6 +22,7 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.apache.amoro.AmoroTable;
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.api.CommitMetaProducer;
 import org.apache.amoro.data.DataFileType;
@@ -35,12 +36,14 @@ import 
org.apache.amoro.server.dashboard.model.TableBasicInfo;
 import org.apache.amoro.server.dashboard.model.TableStatistics;
 import org.apache.amoro.server.dashboard.utils.AmsUtil;
 import org.apache.amoro.server.dashboard.utils.TableStatCollector;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -510,22 +513,27 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
   public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
       AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, 
int offset) {
     TableIdentifier tableIdentifier = amoroTable.id();
+    ServerTableIdentifier identifier =
+        getAs(
+            TableMetaMapper.class,
+            m ->
+                m.selectTableIdentifier(
+                    tableIdentifier.getCatalog(),
+                    tableIdentifier.getDatabase(),
+                    tableIdentifier.getTableName()));
+    if (identifier == null) {
+      return Pair.of(Collections.emptyList(), 0);
+    }
     int total = 0;
     // page helper is 1-based
     int pageNumber = (offset / limit) + 1;
-    List<OptimizingProcessMeta> processMetaList = Collections.emptyList();
+    List<TableProcessMeta> processMetaList = Collections.emptyList();
     try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
       processMetaList =
           getAs(
-              OptimizingMapper.class,
-              mapper ->
-                  mapper.selectOptimizingProcesses(
-                      tableIdentifier.getCatalog(),
-                      tableIdentifier.getDatabase(),
-                      tableIdentifier.getTableName(),
-                      type,
-                      status));
-      PageInfo<OptimizingProcessMeta> pageInfo = new 
PageInfo<>(processMetaList);
+              TableProcessMapper.class,
+              mapper -> mapper.listProcessMeta(identifier.getId(), type, 
status));
+      PageInfo<TableProcessMeta> pageInfo = new PageInfo<>(processMetaList);
       total = (int) pageInfo.getTotal();
       LOG.info(
           "Get optimizing processes total : {} , pageNumber:{}, limit:{}, 
offset:{}",
@@ -538,17 +546,19 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
       }
     }
     List<Long> processIds =
-        processMetaList.stream()
-            .map(OptimizingProcessMeta::getProcessId)
-            .collect(Collectors.toList());
+        
processMetaList.stream().map(TableProcessMeta::getProcessId).collect(Collectors.toList());
     Map<Long, List<OptimizingTaskMeta>> optimizingTasks =
-        getAs(OptimizingMapper.class, mapper -> 
mapper.selectOptimizeTaskMetas(processIds)).stream()
+        getAs(OptimizingProcessMapper.class, mapper -> 
mapper.selectOptimizeTaskMetas(processIds))
+            .stream()
             .collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));
 
     LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
     return Pair.of(
         processMetaList.stream()
-            .map(p -> buildOptimizingProcessInfo(p, 
optimizingTasks.get(p.getProcessId())))
+            .map(
+                p ->
+                    buildOptimizingProcessInfo(
+                        identifier, p, optimizingTasks.get(p.getProcessId())))
             .collect(Collectors.toList()),
         total);
   }
@@ -568,7 +578,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
     long id = Long.parseLong(processId);
     List<OptimizingTaskMeta> optimizingTaskMetaList =
         getAs(
-            OptimizingMapper.class,
+            OptimizingProcessMapper.class,
             mapper -> 
mapper.selectOptimizeTaskMetas(Collections.singletonList(id)));
     if (CollectionUtils.isEmpty(optimizingTaskMetaList)) {
       return Collections.emptyList();
@@ -833,7 +843,9 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
   }
 
   private static OptimizingProcessInfo buildOptimizingProcessInfo(
-      OptimizingProcessMeta meta, List<OptimizingTaskMeta> 
optimizingTaskStats) {
+      ServerTableIdentifier identifier,
+      TableProcessMeta meta,
+      List<OptimizingTaskMeta> optimizingTaskStats) {
     if (meta == null) {
       return null;
     }
@@ -858,28 +870,25 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
       result.setSuccessTasks(successTasks);
       result.setRunningTasks(runningTasks);
     }
-    MetricsSummary summary = meta.getSummary();
-    if (summary != null) {
-      result.setInputFiles(summary.getInputFilesStatistics());
-      result.setOutputFiles(summary.getOutputFilesStatistics());
-    }
-
+    MetricsSummary summary = MetricsSummary.fromMap(meta.getSummary());
+    result.setInputFiles(summary.getInputFilesStatistics());
+    result.setOutputFiles(summary.getOutputFilesStatistics());
     result.setTableId(meta.getTableId());
-    result.setCatalogName(meta.getCatalogName());
-    result.setDbName(meta.getDbName());
-    result.setTableName(meta.getTableName());
+    result.setCatalogName(identifier.getCatalog());
+    result.setDbName(identifier.getDatabase());
+    result.setTableName(identifier.getTableName());
 
     result.setProcessId(String.valueOf(meta.getProcessId()));
-    result.setStartTime(meta.getPlanTime());
-    result.setOptimizingType(meta.getOptimizingType().name());
+    result.setStartTime(meta.getCreateTime());
+    result.setOptimizingType(meta.getProcessType());
     result.setStatus(ProcessStatus.valueOf(meta.getStatus().name()));
-    result.setFailReason(meta.getFailReason());
+    result.setFailReason(meta.getFailMessage());
     result.setDuration(
-        meta.getEndTime() > 0
-            ? meta.getEndTime() - meta.getPlanTime()
-            : System.currentTimeMillis() - meta.getPlanTime());
-    result.setFinishTime(meta.getEndTime());
-    result.setSummary(meta.getSummary().summaryAsMap(true));
+        meta.getFinishTime() > 0
+            ? meta.getFinishTime() - meta.getCreateTime()
+            : System.currentTimeMillis() - meta.getCreateTime());
+    result.setFinishTime(meta.getFinishTime());
+    result.setSummary(meta.getSummary());
     return result;
   }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
deleted file mode 100644
index 8fae30c41..000000000
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.optimizing;
-
-import org.apache.amoro.optimizing.MetricsSummary;
-import org.apache.amoro.optimizing.OptimizingType;
-import org.apache.amoro.process.ProcessStatus;
-
-import java.util.Map;
-
-/** Meta of optimizing process. */
-public class OptimizingProcessMeta {
-
-  private Long processId;
-  private Long tableId;
-  private String catalogName;
-  private String dbName;
-  private String tableName;
-  private Long targetSnapshotId;
-  private Long targetChangeSnapshotId;
-  private ProcessStatus processStatus;
-  private OptimizingType optimizingType;
-  private long planTime;
-  private long endTime;
-  private String failReason;
-  private MetricsSummary summary;
-  private Map<String, Long> fromSequence;
-  private Map<String, Long> toSequence;
-
-  public OptimizingProcessMeta() {}
-
-  public Long getProcessId() {
-    return processId;
-  }
-
-  public void setProcessId(Long processId) {
-    this.processId = processId;
-  }
-
-  public Long getTableId() {
-    return tableId;
-  }
-
-  public void setTableId(Long tableId) {
-    this.tableId = tableId;
-  }
-
-  public String getCatalogName() {
-    return catalogName;
-  }
-
-  public void setCatalogName(String catalogName) {
-    this.catalogName = catalogName;
-  }
-
-  public String getDbName() {
-    return dbName;
-  }
-
-  public void setDbName(String dbName) {
-    this.dbName = dbName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public Long getTargetSnapshotId() {
-    return targetSnapshotId;
-  }
-
-  public void setTargetSnapshotId(Long targetSnapshotId) {
-    this.targetSnapshotId = targetSnapshotId;
-  }
-
-  public ProcessStatus getStatus() {
-    return processStatus;
-  }
-
-  public void setStatus(ProcessStatus processStatus) {
-    this.processStatus = processStatus;
-  }
-
-  public OptimizingType getOptimizingType() {
-    return optimizingType;
-  }
-
-  public void setOptimizingType(OptimizingType optimizingType) {
-    this.optimizingType = optimizingType;
-  }
-
-  public long getPlanTime() {
-    return planTime;
-  }
-
-  public void setPlanTime(long planTime) {
-    this.planTime = planTime;
-  }
-
-  public long getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  public String getFailReason() {
-    return failReason;
-  }
-
-  public void setFailReason(String failReason) {
-    this.failReason = failReason;
-  }
-
-  public MetricsSummary getSummary() {
-    return summary;
-  }
-
-  public void setSummary(MetricsSummary summary) {
-    this.summary = summary;
-  }
-
-  public Long getTargetChangeSnapshotId() {
-    return targetChangeSnapshotId;
-  }
-
-  public void setTargetChangeSnapshotId(Long targetChangeSnapshotId) {
-    this.targetChangeSnapshotId = targetChangeSnapshotId;
-  }
-
-  public Map<String, Long> getFromSequence() {
-    return fromSequence;
-  }
-
-  public void setFromSequence(Map<String, Long> fromSequence) {
-    this.fromSequence = fromSequence;
-  }
-
-  public Map<String, Long> getToSequence() {
-    return toSequence;
-  }
-
-  public void setToSequence(Map<String, Long> toSequence) {
-    this.toSequence = toSequence;
-  }
-}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ed8464fe3..56888fb97 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -37,8 +37,9 @@ import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TaskFilesPersistence;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.resource.QuotaProvider;
 import org.apache.amoro.server.table.DefaultOptimizingState;
@@ -190,10 +191,6 @@ public class OptimizingQueue extends PersistentBase {
         tableRuntime.getTableIdentifier());
   }
 
-  public boolean containsTable(ServerTableIdentifier identifier) {
-    return scheduler.getTableRuntime(identifier) != null;
-  }
-
   private void clearProcess(OptimizingProcess optimizingProcess) {
     tableQueue.removeIf(process -> process.getProcessId() == 
optimizingProcess.getProcessId());
     retryTaskQueue.removeIf(
@@ -741,22 +738,31 @@ public class OptimizingQueue extends PersistentBase {
       doAsTransaction(
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  TableProcessMapper.class,
                   mapper ->
-                      mapper.insertOptimizingProcess(
-                          optimizingState.getTableIdentifier(),
+                      mapper.insertProcess(
+                          optimizingState.getTableIdentifier().getId(),
                           processId,
-                          targetSnapshotId,
-                          targetChangeSnapshotId,
                           status,
-                          optimizingType,
+                          optimizingType.name().toUpperCase(),
+                          
optimizingState.getOptimizingStatus().name().toLowerCase(),
+                          "AMORO",
                           planTime,
-                          getSummary(),
+                          getSummary().summaryAsMap(false))),
+          () ->
+              doAs(
+                  OptimizingProcessMapper.class,
+                  mapper ->
+                      mapper.insertInternalProcessState(
+                          optimizingState.getTableIdentifier().getId(),
+                          processId,
+                          targetSnapshotId,
+                          targetChangeSnapshotId,
                           fromSequence,
                           toSequence)),
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  OptimizingProcessMapper.class,
                   mapper -> 
mapper.insertTaskRuntimes(Lists.newArrayList(taskMap.values()))),
           () -> TaskFilesPersistence.persistTaskInputs(processId, 
taskMap.values()),
           () -> optimizingState.beginProcess(this));
@@ -771,15 +777,16 @@ public class OptimizingQueue extends PersistentBase {
           },
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  TableProcessMapper.class,
                   mapper ->
-                      mapper.updateOptimizingProcess(
+                      mapper.updateProcess(
                           optimizingState.getTableIdentifier().getId(),
                           processId,
                           status,
-                          endTime,
-                          getSummary(),
-                          getFailedReason())),
+                          
optimizingState.getOptimizingStatus().name().toLowerCase(),
+                          System.currentTimeMillis(),
+                          getFailedReason(),
+                          getSummary().summaryAsMap(false))),
           () -> optimizingState.completeProcess(success),
           () -> clearProcess(this));
     }
@@ -792,7 +799,7 @@ public class OptimizingQueue extends PersistentBase {
       try {
         List<TaskRuntime<RewriteStageTask>> taskRuntimes =
             getAs(
-                OptimizingMapper.class,
+                OptimizingProcessMapper.class,
                 mapper ->
                     mapper.selectTaskRuntimes(
                         optimizingState.getTableIdentifier().getId(), 
processId));
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index 9f7071c91..f6db6e430 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -28,7 +28,7 @@ import org.apache.amoro.process.SimpleFuture;
 import org.apache.amoro.process.StagedTaskDescriptor;
 import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.persistence.StatedPersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.resource.OptimizerThread;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
@@ -259,7 +259,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?, 
?, ?>> extends Stated
   }
 
   private void persistTaskRuntime() {
-    doAs(OptimizingMapper.class, mapper -> mapper.updateTaskRuntime(this));
+    doAs(OptimizingProcessMapper.class, mapper -> 
mapper.updateTaskRuntime(this));
   }
 
   public TaskQuota getCurrentQuota() {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 608472d16..74544e178 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -27,11 +27,12 @@ import 
com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
 import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
 import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.PlatformFileMapper;
 import org.apache.amoro.server.persistence.mapper.ResourceMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.ibatis.mapping.DatabaseIdProvider;
 import org.apache.ibatis.mapping.Environment;
@@ -63,13 +64,14 @@ public class SqlSessionFactoryProvider {
     Environment environment = new Environment("develop", transactionFactory, 
dataSource);
     Configuration configuration = new Configuration(environment);
     configuration.addMapper(TableMetaMapper.class);
-    configuration.addMapper(OptimizingMapper.class);
+    configuration.addMapper(OptimizingProcessMapper.class);
     configuration.addMapper(CatalogMetaMapper.class);
     configuration.addMapper(OptimizerMapper.class);
     configuration.addMapper(ApiTokensMapper.class);
     configuration.addMapper(PlatformFileMapper.class);
     configuration.addMapper(ResourceMapper.class);
     configuration.addMapper(TableBlockerMapper.class);
+    configuration.addMapper(TableProcessMapper.class);
 
     PageInterceptor interceptor = new PageInterceptor();
     Properties interceptorProperties = new Properties();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
index 92a07a5ab..1163960f5 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
@@ -22,7 +22,7 @@ import org.apache.amoro.optimizing.RewriteFilesInput;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.RewriteStageTask;
 import org.apache.amoro.server.optimizing.TaskRuntime;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.utils.CompressUtil;
 import org.apache.amoro.utils.SerializationUtil;
 
@@ -49,7 +49,7 @@ public class TaskFilesPersistence {
   public static Map<Integer, RewriteFilesInput> loadTaskInputs(long processId) 
{
     List<byte[]> bytes =
         persistence.getAs(
-            OptimizingMapper.class, mapper -> 
mapper.selectProcessInputFiles(processId));
+            OptimizingProcessMapper.class, mapper -> 
mapper.selectProcessInputFiles(processId));
     if (bytes == null || bytes.isEmpty()) {
       return Collections.emptyMap();
     } else {
@@ -64,7 +64,9 @@ public class TaskFilesPersistence {
   private static class DatabasePersistence extends PersistentBase {
 
     public void persistTaskInputs(long processId, Map<Integer, 
RewriteFilesInput> tasks) {
-      doAs(OptimizingMapper.class, mapper -> 
mapper.updateProcessInputFiles(processId, tasks));
+      doAs(
+          OptimizingProcessMapper.class,
+          mapper -> mapper.updateProcessInputFiles(processId, tasks));
     }
   }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
similarity index 68%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
rename to 
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
index 3124a6670..abf6a30da 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
@@ -18,20 +18,14 @@
 
 package org.apache.amoro.server.persistence.mapper;
 
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.optimizing.MetricsSummary;
-import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import org.apache.amoro.optimizing.RewriteStageTask;
-import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.process.StagedTaskDescriptor;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
 import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
 import org.apache.amoro.server.persistence.converter.Long2TsConverter;
 import org.apache.amoro.server.persistence.converter.Map2StringConverter;
-import org.apache.amoro.server.persistence.converter.MapLong2StringConverter;
 import org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert;
 import 
org.apache.amoro.server.persistence.converter.TaskDescriptorTypeConverter;
 import 
org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver;
@@ -50,102 +44,36 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-public interface OptimizingMapper {
-
-  /** OptimizingProcess operation below */
-  @Delete(
-      "DELETE FROM table_optimizing_process WHERE table_id = #{tableId} and 
process_id < #{expireId}")
-  void deleteOptimizingProcessBefore(
-      @Param("tableId") long tableId, @Param("expireId") long expireId);
-
+public interface OptimizingProcessMapper {
   @Insert(
-      "INSERT INTO table_optimizing_process(table_id, catalog_name, db_name, 
table_name ,process_id,"
-          + " target_snapshot_id, target_change_snapshot_id, status, 
optimizing_type, plan_time, summary, from_sequence,"
-          + " to_sequence) VALUES (#{table.id}, #{table.catalog},"
-          + " #{table.database}, #{table.tableName}, #{processId}, 
#{targetSnapshotId}, #{targetChangeSnapshotId},"
-          + " #{status}, #{optimizingType},"
-          + " #{planTime, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
-          + " #{summary, 
typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter},"
+      "INSERT INTO optimizing_process_state "
+          + "(process_id, table_id, target_snapshot_id, 
target_change_snapshot_id, "
+          + "from_sequence, to_sequence) "
+          + "VALUES (#{processId}, #{tableId}, #{targetSnapshotId}, 
#{targetChangeSnapshotId}, "
           + " #{fromSequence, 
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter},"
-          + " #{toSequence, 
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter}"
-          + ")")
-  void insertOptimizingProcess(
-      @Param("table") ServerTableIdentifier tableIdentifier,
+          + " #{toSequence, 
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter})")
+  void insertInternalProcessState(
+      @Param("tableId") long tableId,
       @Param("processId") long processId,
       @Param("targetSnapshotId") long targetSnapshotId,
       @Param("targetChangeSnapshotId") long targetChangeSnapshotId,
-      @Param("status") ProcessStatus status,
-      @Param("optimizingType") OptimizingType optimizingType,
-      @Param("planTime") long planTime,
-      @Param("summary") MetricsSummary summary,
       @Param("fromSequence") Map<String, Long> fromSequence,
       @Param("toSequence") Map<String, Long> toSequence);
 
-  @Update(
-      "UPDATE table_optimizing_process SET status = #{optimizingStatus},"
-          + " end_time = #{endTime, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
-          + "summary = #{summary, 
typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter}, 
"
-          + "fail_reason = #{failedReason, jdbcType=VARCHAR}"
-          + " WHERE table_id = #{tableId} AND process_id = #{processId}")
-  void updateOptimizingProcess(
-      @Param("tableId") long tableId,
-      @Param("processId") long processId,
-      @Param("optimizingStatus") ProcessStatus status,
-      @Param("endTime") long endTime,
-      @Param("summary") MetricsSummary summary,
-      @Param("failedReason") String failedReason);
+  @Delete(
+      "DELETE FROM optimizing_process_state WHERE process_id <= #{processId} 
AND table_id = #{tableId}")
+  void deleteProcessStateBefore(@Param("tableId") long tableId, 
@Param("processId") long processId);
 
-  @Select(
-      "<script>"
-          + "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, 
a.table_name, a.target_snapshot_id,"
-          + " a.target_change_snapshot_id, a.status, a.optimizing_type, 
a.plan_time, a.end_time,"
-          + " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM 
table_optimizing_process a"
-          + " INNER JOIN table_identifier b ON a.table_id = b.table_id"
-          + " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName} 
AND a.table_name = #{tableName}"
-          + " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName} 
AND b.table_name = #{tableName}"
-          + " <if test='optimizingType != null'> AND a.optimizing_type = 
#{optimizingType}</if>"
-          + " <if test='optimizingStatus != null'> AND a.status = 
#{optimizingStatus}</if>"
-          + " ORDER BY process_id desc"
-          + "</script>")
-  @Results(
-      id = "processMeta",
-      value = {
-        @Result(property = "processId", column = "process_id"),
-        @Result(property = "tableId", column = "table_id"),
-        @Result(property = "catalogName", column = "catalog_name"),
-        @Result(property = "dbName", column = "db_name"),
-        @Result(property = "tableName", column = "table_name"),
-        @Result(property = "targetSnapshotId", column = "target_snapshot_id"),
-        @Result(property = "targetChangeSnapshotId", column = 
"target_change_snapshot_id"),
-        @Result(property = "status", column = "status"),
-        @Result(property = "optimizingType", column = "optimizing_type"),
-        @Result(property = "planTime", column = "plan_time", typeHandler = 
Long2TsConverter.class),
-        @Result(property = "endTime", column = "end_time", typeHandler = 
Long2TsConverter.class),
-        @Result(property = "failReason", column = "fail_reason"),
-        @Result(property = "summary", column = "summary", typeHandler = 
JsonObjectConverter.class),
-        @Result(
-            property = "fromSequence",
-            column = "from_sequence",
-            typeHandler = MapLong2StringConverter.class),
-        @Result(
-            property = "toSequence",
-            column = "to_sequence",
-            typeHandler = MapLong2StringConverter.class)
-      })
-  List<OptimizingProcessMeta> selectOptimizingProcesses(
-      @Param("catalogName") String catalogName,
-      @Param("dbName") String dbName,
-      @Param("tableName") String tableName,
-      @Param("optimizingType") String optimizingType,
-      @Param("optimizingStatus") ProcessStatus optimizingStatus);
+  @Select("SELECT rewrite_input FROM optimizing_process_state WHERE process_id 
= #{processId}")
+  @Results({@Result(column = "rewrite_input", jdbcType = JdbcType.BLOB)})
+  List<byte[]> selectProcessInputFiles(@Param("processId") long processId);
 
-  @Select(
-      "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, 
a.table_name, a.target_snapshot_id,"
-          + " a.target_change_snapshot_id, a.status, a.optimizing_type, 
a.plan_time, a.end_time,"
-          + " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM 
table_optimizing_process a "
-          + " WHERE a.process_id = #{processId}")
-  @ResultMap("processMeta")
-  OptimizingProcessMeta getOptimizingProcess(@Param("processId") long 
processId);
+  @Update(
+      "UPDATE optimizing_process_state SET rewrite_input = #{input, 
jdbcType=BLOB,"
+          + " 
typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}"
+          + " WHERE process_id = #{processId}")
+  void updateProcessInputFiles(
+      @Param("processId") long processId, @Param("input") Map<Integer, 
RewriteFilesInput> input);
 
   /** Optimizing TaskRuntime operation below */
   @Insert({
@@ -260,18 +188,6 @@ public interface OptimizingMapper {
   @Delete("DELETE FROM task_runtime WHERE table_id = #{tableId} AND process_id 
< #{expireId}")
   void deleteTaskRuntimesBefore(@Param("tableId") long tableId, 
@Param("expireId") long expireId);
 
-  /** Optimizing rewrite input and output operations below */
-  @Update(
-      "UPDATE table_optimizing_process SET rewrite_input = #{input, 
jdbcType=BLOB,"
-          + " 
typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}"
-          + " WHERE process_id = #{processId}")
-  void updateProcessInputFiles(
-      @Param("processId") long processId, @Param("input") Map<Integer, 
RewriteFilesInput> input);
-
-  @Select("SELECT rewrite_input FROM table_optimizing_process WHERE process_id 
= #{processId}")
-  @Results({@Result(column = "rewrite_input", jdbcType = JdbcType.BLOB)})
-  List<byte[]> selectProcessInputFiles(@Param("processId") long processId);
-
   /** Optimizing task quota operations below */
   @Select(
       "SELECT process_id, task_id, retry_num, table_id, start_time, end_time, 
fail_reason "
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
index 1ae224672..202dcd491 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
@@ -337,7 +337,7 @@ public interface TableMetaMapper {
           + " last_full_optimizing_time = #{runtime.lastFullOptimizingTime,"
           + " 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
           + " optimizing_status_code = #{runtime.optimizingStatus,"
-          + 
"typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter},"
+          + " 
typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter},"
           + " optimizing_status_start_time = #{runtime.currentStatusStartTime,"
           + " 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
           + " optimizing_process_id = #{runtime.processId},"
@@ -388,10 +388,13 @@ public interface TableMetaMapper {
           + " a.current_change_snapshotId, a.last_optimized_snapshotId, 
a.last_optimized_change_snapshotId,"
           + " a.last_major_optimizing_time, a.last_minor_optimizing_time, 
a.last_full_optimizing_time,"
           + " a.optimizing_status_code, a.optimizing_status_start_time, 
a.optimizing_process_id,"
-          + " a.optimizer_group, a.table_config, a.pending_input, 
a.table_summary, b.optimizing_type, b.target_snapshot_id,"
-          + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, 
b.to_sequence FROM table_runtime a"
+          + " a.optimizer_group, a.table_config, a.pending_input, 
a.table_summary, "
+          + " p.process_type, s.target_snapshot_id,"
+          + " s.target_change_snapshot_id, p.create_time, s.from_sequence, 
s.to_sequence "
+          + " FROM table_runtime a "
           + " INNER JOIN table_identifier i ON a.table_id = i.table_id "
-          + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id 
= b.process_id")
+          + " LEFT JOIN table_process p ON a.optimizing_process_id = 
p.process_id "
+          + " LEFT JOIN optimizing_process_state s ON a.optimizing_process_id 
= s.process_id")
   @Results(
       id = "tableRuntimeMeta",
       value = {
@@ -440,10 +443,13 @@ public interface TableMetaMapper {
             property = "tableSummary",
             column = "table_summary",
             typeHandler = JsonObjectConverter.class),
-        @Result(property = "optimizingType", column = "optimizing_type"),
+        @Result(property = "optimizingType", column = "process_type"),
         @Result(property = "targetSnapshotId", column = "target_snapshot_id"),
         @Result(property = "targetChangeSnapshotId", column = 
"target_change_snapshot_id"),
-        @Result(property = "planTime", column = "plan_time", typeHandler = 
Long2TsConverter.class),
+        @Result(
+            property = "planTime",
+            column = "create_time",
+            typeHandler = Long2TsConverter.class),
         @Result(
             property = "fromSequence",
             column = "from_sequence",
@@ -460,10 +466,13 @@ public interface TableMetaMapper {
           + " a.current_change_snapshotId, a.last_optimized_snapshotId, 
a.last_optimized_change_snapshotId,"
           + " a.last_major_optimizing_time, a.last_minor_optimizing_time, 
a.last_full_optimizing_time,"
           + " a.optimizing_status_code, a.optimizing_status_start_time, 
a.optimizing_process_id,"
-          + " a.optimizer_group, a.table_config, a.pending_input, 
a.table_summary, b.optimizing_type, b.target_snapshot_id,"
-          + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, 
b.to_sequence FROM table_runtime a"
+          + " a.optimizer_group, a.table_config, a.pending_input, 
a.table_summary, "
+          + " p.process_type, s.target_snapshot_id,"
+          + " s.target_change_snapshot_id, p.create_time, s.from_sequence, 
s.to_sequence "
+          + " FROM table_runtime a "
           + " INNER JOIN table_identifier i ON a.table_id = i.table_id "
-          + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id 
= b.process_id "
+          + " LEFT JOIN table_process p ON a.optimizing_process_id = 
p.process_id "
+          + " LEFT JOIN optimizing_process_state s ON a.optimizing_process_id 
= s.process_id"
           + " WHERE a.table_id = #{tableId}")
   @ResultMap("tableRuntimeMeta")
   TableRuntimeMeta getTableRuntimeMeta(@Param("tableId") long tableId);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
new file mode 100644
index 000000000..0e52170d9
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.persistence.converter.Long2TsConverter;
+import org.apache.amoro.server.persistence.converter.Map2StringConverter;
+import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+import java.util.Map;
+
+public interface TableProcessMapper {
+
+  @Delete("DELETE FROM table_process WHERE process_id <= #{processId} AND 
table_id = #{tableId}")
+  void deleteBefore(@Param("tableId") long tableId, @Param("processId") long 
processId);
+
+  @Insert(
+      "INSERT INTO table_process "
+          + "(process_id, table_id, status, process_type, process_stage, 
execution_engine, "
+          + "create_time, summary) "
+          + "VALUES (#{processId}, #{tableId}, #{status}, #{processType}, 
#{processStage}, "
+          + "#{executionEngine}, "
+          + "#{createTime, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+          + "#{summary, 
typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})")
+  void insertProcess(
+      @Param("tableId") long tableId,
+      @Param("processId") long processId,
+      @Param("status") ProcessStatus status,
+      @Param("processType") String processType,
+      @Param("processStage") String processStage,
+      @Param("executionEngine") String executionEngine,
+      @Param("createTime") long createTime,
+      @Param("summary") Map<String, String> summary);
+
+  @Update(
+      "UPDATE table_process SET status = #{status}, "
+          + "process_stage = #{processStage}, "
+          + "finish_time = #{finishTime, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+          + "fail_message = #{failMessage, jdbcType=VARCHAR}, "
+          + "summary = #{summary, 
typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter} "
+          + "WHERE process_id = #{processId} AND table_id = #{tableId}")
+  void updateProcess(
+      @Param("tableId") long tableId,
+      @Param("processId") long processId,
+      @Param("status") ProcessStatus status,
+      @Param("processStage") String processStage,
+      @Param("finishTime") long finishTime,
+      @Param("failMessage") String failMessage,
+      @Param("summary") Map<String, String> summary);
+
+  @Select(
+      "SELECT process_id, table_id, status, process_type, process_stage, 
execution_engine, "
+          + "create_time, finish_time, fail_message, summary "
+          + "FROM table_process WHERE process_id = #{processId}")
+  @Results(
+      id = "tableProcessMap",
+      value = {
+        @Result(column = "process_id", property = "processId"),
+        @Result(column = "table_id", property = "tableId"),
+        @Result(column = "status", property = "status"),
+        @Result(column = "process_type", property = "processType"),
+        @Result(column = "process_stage", property = "processStage"),
+        @Result(column = "execution_engine", property = "executionEngine"),
+        @Result(
+            column = "create_time",
+            property = "createTime",
+            typeHandler = Long2TsConverter.class),
+        @Result(
+            column = "finish_time",
+            property = "finishTime",
+            typeHandler = Long2TsConverter.class),
+        @Result(column = "fail_message", property = "failMessage"),
+        @Result(column = "summary", property = "summary", typeHandler = 
Map2StringConverter.class)
+      })
+  TableProcessMeta getProcessMeta(@Param("processId") long processId);
+
+  @Select(
+      "<script>"
+          + "SELECT process_id, table_id, status, process_type, process_stage, 
execution_engine, "
+          + "create_time, finish_time, fail_message, summary "
+          + "FROM table_process WHERE table_id = #{tableId} "
+          + " <if test='processType != null'> AND process_type = 
#{processType}</if>"
+          + " <if test='status != null'> AND status = #{status}</if>"
+          + " ORDER BY process_id desc"
+          + "</script>")
+  @ResultMap("tableProcessMap")
+  List<TableProcessMeta> listProcessMeta(
+      @Param("tableId") long tableId,
+      @Param("processType") String processType,
+      @Param("status") ProcessStatus optimizingStatus);
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
new file mode 100644
index 000000000..6e822b0f6
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process;
+
+import org.apache.amoro.process.ProcessStatus;
+
+import java.util.Map;
+
+public class TableProcessMeta {
+  private long processId;
+  private long tableId;
+  private ProcessStatus status;
+  private String processType;
+  private String processStage;
+  private String executionEngine;
+  private long createTime;
+  private long finishTime;
+  private String failMessage;
+  private Map<String, String> summary;
+
+  public long getProcessId() {
+    return processId;
+  }
+
+  public void setProcessId(long processId) {
+    this.processId = processId;
+  }
+
+  public long getTableId() {
+    return tableId;
+  }
+
+  public void setTableId(long tableId) {
+    this.tableId = tableId;
+  }
+
+  public ProcessStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(ProcessStatus status) {
+    this.status = status;
+  }
+
+  public String getProcessType() {
+    return processType;
+  }
+
+  public void setProcessType(String processType) {
+    this.processType = processType;
+  }
+
+  public String getProcessStage() {
+    return processStage;
+  }
+
+  public void setProcessStage(String processStage) {
+    this.processStage = processStage;
+  }
+
+  public String getExecutionEngine() {
+    return executionEngine;
+  }
+
+  public void setExecutionEngine(String executionEngine) {
+    this.executionEngine = executionEngine;
+  }
+
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public void setCreateTime(long createTime) {
+    this.createTime = createTime;
+  }
+
+  public String getFailMessage() {
+    return failMessage;
+  }
+
+  public void setFailMessage(String failMessage) {
+    this.failMessage = failMessage;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public Map<String, String> getSummary() {
+    return summary;
+  }
+
+  public void setSummary(Map<String, String> summary) {
+    this.summary = summary;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
index 77f610b7c..329afc8ef 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
@@ -20,7 +20,8 @@ package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
@@ -72,19 +73,24 @@ public class OptimizingExpiringExecutor extends 
PeriodicTableScheduler {
       doAsTransaction(
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  TableProcessMapper.class,
                   mapper ->
-                      mapper.deleteOptimizingProcessBefore(
+                      
mapper.deleteBefore(tableRuntime.getTableIdentifier().getId(), minProcessId)),
+          () ->
+              doAs(
+                  OptimizingProcessMapper.class,
+                  mapper ->
+                      mapper.deleteProcessStateBefore(
                           tableRuntime.getTableIdentifier().getId(), 
minProcessId)),
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  OptimizingProcessMapper.class,
                   mapper ->
                       mapper.deleteTaskRuntimesBefore(
                           tableRuntime.getTableIdentifier().getId(), 
minProcessId)),
           () ->
               doAs(
-                  OptimizingMapper.class,
+                  OptimizingProcessMapper.class,
                   mapper ->
                       mapper.deleteOptimizingQuotaBefore(
                           tableRuntime.getTableIdentifier().getId(), 
minProcessId)));
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
index 68204e5eb..c34e82044 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
@@ -39,7 +39,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.StatedPersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
 import org.apache.amoro.server.resource.OptimizerInstance;
@@ -332,7 +332,7 @@ public class DefaultOptimizingState extends 
StatedPersistentBase implements Proc
       taskQuotas.clear();
       taskQuotas.addAll(
           getAs(
-              OptimizingMapper.class,
+              OptimizingProcessMapper.class,
               mapper -> mapper.selectTaskQuotasByTime(tableIdentifier.getId(), 
minProcessId)));
     } finally {
       tableLock.unlock();
@@ -446,7 +446,7 @@ public class DefaultOptimizingState extends 
StatedPersistentBase implements Proc
   }
 
   public void addTaskQuota(TaskRuntime.TaskQuota taskQuota) {
-    doAsIgnoreError(OptimizingMapper.class, mapper -> 
mapper.insertTaskQuota(taskQuota));
+    doAsIgnoreError(OptimizingProcessMapper.class, mapper -> 
mapper.insertTaskQuota(taskQuota));
     taskQuotas.add(taskQuota);
     long validTime = System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
     this.taskQuotas.removeIf(task -> task.checkExpired(validTime));
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
index 826d59b7b..b3d9a03d5 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
@@ -42,7 +42,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
 import org.apache.amoro.server.resource.OptimizerInstance;
@@ -271,7 +271,7 @@ public class DefaultTableManager extends PersistentBase 
implements TableManager
 
     List<OptimizingTaskMeta> taskMetas =
         getAs(
-            OptimizingMapper.class,
+            OptimizingProcessMapper.class,
             m -> {
               if (processIds.isEmpty()) {
                 return Lists.newArrayList();
@@ -318,7 +318,9 @@ public class DefaultTableManager extends PersistentBase 
implements TableManager
     long calculatingStartTime = calculatingEndTime - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
     long minProcessId = 
SnowflakeIdGenerator.getMinSnowflakeId(calculatingStartTime);
     List<TaskRuntime.TaskQuota> quotas =
-        getAs(OptimizingMapper.class, mapper -> 
mapper.selectTableQuotas(tableIds, minProcessId));
+        getAs(
+            OptimizingProcessMapper.class,
+            mapper -> mapper.selectTableQuotas(tableIds, minProcessId));
 
     return quotas.stream()
         .collect(Collectors.groupingBy(TaskRuntime.TaskQuota::getTableId, 
Collectors.toList()));
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql 
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 0e4e507e3..d04589f10 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -123,25 +123,32 @@ CREATE TABLE table_runtime (
     CONSTRAINT table_runtime_table_name_idx UNIQUE (catalog_name, db_name, 
table_name)
 );
 
-CREATE TABLE table_optimizing_process (
-    process_id          BIGINT NOT NULL,
-    table_id            BIGINT NOT NULL,
-    catalog_name        VARCHAR(64) NOT NULL,
-    db_name             VARCHAR(128) NOT NULL,
-    table_name          VARCHAR(256) NOT NULL,
-    target_snapshot_id  BIGINT NOT NULL,
-    target_change_snapshot_id  BIGINT NOT NULL,
-    status              VARCHAR(10) NOT NULL,
-    optimizing_type     VARCHAR(10) NOT NULL,
-    plan_time           TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    end_time            TIMESTAMP DEFAULT NULL,
-    fail_reason         VARCHAR(4096),
-    rewrite_input       BLOB(64m),
-    summary             CLOB(64m),
-    from_sequence       CLOB(64m),
-    to_sequence         CLOB(64m),
-    CONSTRAINT table_optimizing_process_pk PRIMARY KEY (process_id)
+CREATE TABLE table_process (
+    process_id       BIGINT NOT NULL PRIMARY KEY,
+    table_id         BIGINT NOT NULL,
+    status           VARCHAR(64) NOT NULL,
+    process_type     VARCHAR(64) NOT NULL,
+    process_stage    VARCHAR(64) NOT NULL,
+    execution_engine VARCHAR(64) NOT NULL,
+    create_time      TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    finish_time      TIMESTAMP DEFAULT NULL,
+    fail_message     CLOB,
+    summary          CLOB(64m)
+);
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+CREATE TABLE optimizing_process_state (
+    process_id                BIGINT NOT NULL PRIMARY KEY,
+    table_id                  BIGINT NOT NULL,
+    target_snapshot_id        BIGINT NOT NULL,
+    target_change_snapshot_id BIGINT NOT NULL,
+    rewrite_input             BLOB(64m),
+    from_sequence             CLOB(64m),
+    to_sequence               CLOB(64m)
 );
+CREATE INDEX optimizing_process_state_table_idx
+    ON optimizing_process_state (table_id);
+
 
 CREATE TABLE task_runtime (
     process_id      BIGINT NOT NULL,
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index ee815fcc6..f1b6c3743 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -135,26 +135,33 @@ CREATE TABLE `table_runtime`
     INDEX idx_optimizer_status_and_time (optimizing_status_code, 
optimizing_status_start_time DESC)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize running information 
of each table' ROW_FORMAT=DYNAMIC;
 
-CREATE TABLE `table_optimizing_process`
+CREATE TABLE `table_process`
+(
+    `process_id`                    bigint(20) NOT NULL COMMENT 'table process 
id',
+    `table_id`                      bigint(20) NOT NULL COMMENT 'table id',
+    `status`                        varchar(64) NOT NULL COMMENT 'Table 
optimizing status',
+    `process_type`                  varchar(64) NOT NULL COMMENT 'Process 
action type',
+    `process_stage`                 varchar(64) NOT NULL COMMENT 'Process 
current stage',
+    `execution_engine`              varchar(64) NOT NULL COMMENT 'Execution 
engine',
+    `create_time`                   timestamp DEFAULT CURRENT_TIMESTAMP 
COMMENT 'First plan time',
+    `finish_time`                   timestamp NULL DEFAULT NULL COMMENT 
'finish time or failed time',
+    `fail_message`                  mediumtext DEFAULT NULL COMMENT 'Error 
message after task failed',
+    `summary`                       mediumtext COMMENT 'Max change transaction 
id of these tasks',
+    PRIMARY KEY (`process_id`),
+    KEY  `table_index` (`table_id`, `create_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after 
each commit';
+
+CREATE TABLE `optimizing_process_state`
 (
     `process_id`                    bigint(20) NOT NULL COMMENT 
'optimizing_procedure UUID',
     `table_id`                      bigint(20) NOT NULL,
-    `catalog_name`                  varchar(64) NOT NULL COMMENT 'Catalog 
name',
-    `db_name`                       varchar(128) NOT NULL COMMENT 'Database 
name',
-    `table_name`                    varchar(256) NOT NULL COMMENT 'Table name',
     `target_snapshot_id`            bigint(20) NOT NULL,
     `target_change_snapshot_id`     bigint(20) NOT NULL,
-    `status`                        varchar(10) NOT NULL COMMENT 'Direct to 
TableOptimizingStatus',
-    `optimizing_type`               varchar(10) NOT NULL COMMENT 'Optimize 
type: Major, Minor',
-    `plan_time`                     timestamp DEFAULT CURRENT_TIMESTAMP 
COMMENT 'First plan time',
-    `end_time`                      timestamp NULL DEFAULT NULL COMMENT 
'finish time or failed time',
-    `fail_reason`                   varchar(4096) DEFAULT NULL COMMENT 'Error 
message after task failed',
     `rewrite_input`                 longblob DEFAULT NULL COMMENT 'rewrite 
files input',
-    `summary`                       mediumtext COMMENT 'Max change transaction 
id of these tasks',
     `from_sequence`                 mediumtext COMMENT 'from or min sequence 
of each partition',
     `to_sequence`                   mediumtext COMMENT 'to or max sequence of 
each partition',
     PRIMARY KEY (`process_id`),
-    KEY  `table_index` (`table_id`, `plan_time`)
+    KEY  `table_index` (`table_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after 
each commit';
 
 CREATE TABLE `task_runtime`
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql 
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index 71bc576c5..ebbfc6010 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -23,4 +23,56 @@ ALTER TABLE `table_runtime` MODIFY COLUMN 
`optimizing_status_start_time` TIMESTA
 UPDATE `table_optimizing_process` SET `process_id` = `process_id` /10 << 13;
 UPDATE `task_runtime` SET `process_id` = `process_id` /10 << 13;
 UPDATE `optimizing_task_quota` SET `process_id` = `process_id` /10 << 13;
-UPDATE `table_runtime` SET `optimizing_process_id` = `optimizing_process_id` 
/10 << 13;
\ No newline at end of file
+UPDATE `table_runtime` SET `optimizing_process_id` = `optimizing_process_id` 
/10 << 13;
+
+CREATE TABLE `table_process`
+(
+    `process_id`                    bigint(20) NOT NULL COMMENT 'table process 
id',
+    `table_id`                      bigint(20) NOT NULL COMMENT 'table id',
+    `status`                        varchar(64) NOT NULL COMMENT 'Table 
optimizing status',
+    `process_type`                  varchar(64) NOT NULL COMMENT 'Process 
action type',
+    `process_stage`                 varchar(64) NOT NULL COMMENT 'Process 
current stage',
+    `execution_engine`              varchar(64) NOT NULL COMMENT 'Execution 
engine',
+    `create_time`                   timestamp DEFAULT CURRENT_TIMESTAMP 
COMMENT 'First plan time',
+    `finish_time`                   timestamp NULL DEFAULT NULL COMMENT 
'finish time or failed time',
+    `fail_message`                  mediumtext DEFAULT NULL COMMENT 'Error 
message after task failed',
+    `summary`                       mediumtext COMMENT 'Max change transaction 
id of these tasks',
+    PRIMARY KEY (`process_id`),
+    KEY  `table_index` (`table_id`, `create_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after 
each commit';
+
+CREATE TABLE `optimizing_process_state`
+(
+    `process_id`                    bigint(20) NOT NULL COMMENT 
'optimizing_procedure UUID',
+    `table_id`                      bigint(20) NOT NULL,
+    `target_snapshot_id`            bigint(20) NOT NULL,
+    `target_change_snapshot_id`     bigint(20) NOT NULL,
+    `rewrite_input`                 longblob DEFAULT NULL COMMENT 'rewrite 
files input',
+    `from_sequence`                 mediumtext COMMENT 'from or min sequence 
of each partition',
+    `to_sequence`                   mediumtext COMMENT 'to or max sequence of 
each partition',
+    PRIMARY KEY (`process_id`)
+    KEY  `table_index` (`table_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after 
each commit';
+
+INSERT INTO `table_process`
+(`process_id`, `table_id`, `status`, `process_type`,
+`process_stage`, `execution_engine`, `create_time`, `finish_time`, 
`fail_message`, `summary`)
+SELECT p.`process_id`, p.`table_id`, p.`status`, p.`optimizing_type`,
+CASE
+            WHEN t.`optimizing_status_code` = 700 THEN 'IDLE'
+            WHEN t.`optimizing_status_code` = 600 THEN 'PENDING'
+            WHEN t.`optimizing_status_code` = 500 THEN 'PLANNING'
+            WHEN t.`optimizing_status_code` = 400 THEN 'COMMITTING'
+            WHEN t.`optimizing_status_code` = 300 THEN 'MINOR_OPTIMIZING'
+            WHEN t.`optimizing_status_code` = 200 THEN 'MAJOR_OPTIMIZING'
+            WHEN t.`optimizing_status_code` = 100 THEN 'FULL_OPTIMIZING'
+END,
+ 'AMORO', p.`plan_time`, p.`end_time`, p.`fail_reason`, p.`summary`
+FROM `table_optimizing_process` p JOIN `table_runtime` t ON p.table_id = 
t.table_id;
+
+INSERT INTO `optimizing_process_state`
+(`process_id`, `table_id`, `target_snapshot_id`, `target_change_snapshot_id`, 
`rewrite_input`, `from_sequence`, `to_sequence`)
+SELECT `process_id`, `table_id`, `target_snapshot_id`, 
`target_change_snapshot_id`, `rewrite_input`, `from_sequence`, `to_sequence`
+FROM `table_optimizing_process`;
+
+DROP TABLE IF EXISTS `table_optimizing_process`;
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 6f31d469b..ae0382530 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -215,45 +215,54 @@ COMMENT ON COLUMN table_runtime.pending_input IS 'Pending 
input data';
 COMMENT ON COLUMN table_runtime.table_summary IS 'Table summary data';
 CREATE INDEX idx_optimizer_status_and_time ON 
table_runtime(optimizing_status_code, optimizing_status_start_time DESC);
 
-CREATE TABLE table_optimizing_process
-(
-    process_id BIGINT NOT NULL,
-    table_id BIGINT NOT NULL,
-    catalog_name VARCHAR(64) NOT NULL,
-    db_name VARCHAR(128) NOT NULL,
-    table_name VARCHAR(256) NOT NULL,
-    target_snapshot_id BIGINT NOT NULL,
-    target_change_snapshot_id BIGINT NOT NULL,
-    status VARCHAR(10) NOT NULL,
-    optimizing_type VARCHAR(10) NOT NULL,
-    plan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-    end_time TIMESTAMP,
-    fail_reason VARCHAR(4096),
-    rewrite_input BYTEA,
-    summary TEXT,
-    from_sequence TEXT,
-    to_sequence TEXT,
-    PRIMARY KEY (process_id)
+CREATE TABLE table_process (
+    process_id      bigserial PRIMARY KEY,
+    table_id        bigint NOT NULL,
+    status          varchar(64) NOT NULL,
+    process_type    varchar(64) NOT NULL,
+    process_stage   varchar(64) NOT NULL,
+    execution_engine varchar(64) NOT NULL,
+    create_time     timestamptz NOT NULL DEFAULT now(),
+    finish_time     timestamptz,
+    fail_message    text CHECK (length(fail_message) <= 4096),
+    summary         text,
+    CONSTRAINT table_process_unique UNIQUE (process_id)
+);
+
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+COMMENT ON TABLE  table_process IS 'History of optimizing after each commit';
+COMMENT ON COLUMN table_process.process_id      IS 'table process id';
+COMMENT ON COLUMN table_process.table_id        IS 'table id';
+COMMENT ON COLUMN table_process.status          IS 'Table optimizing status';
+COMMENT ON COLUMN table_process.process_type    IS 'Process action type';
+COMMENT ON COLUMN table_process.process_stage   IS 'Process current stage';
+COMMENT ON COLUMN table_process.execution_engine IS 'Execution engine';
+COMMENT ON COLUMN table_process.create_time     IS 'First plan time';
+COMMENT ON COLUMN table_process.finish_time     IS 'finish time or failed 
time';
+COMMENT ON COLUMN table_process.fail_message    IS 'Error message after task 
failed';
+COMMENT ON COLUMN table_process.summary         IS 'Max change transaction id 
of these tasks';
+
+CREATE TABLE optimizing_process_state (
+    process_id                bigint PRIMARY KEY,
+    table_id                  bigint NOT NULL,
+    target_snapshot_id        bigint NOT NULL,
+    target_change_snapshot_id bigint NOT NULL,
+    rewrite_input             bytea,
+    from_sequence             text,
+    to_sequence               text
 );
-CREATE INDEX process_index ON table_optimizing_process (table_id, plan_time);
 
-COMMENT ON TABLE table_optimizing_process IS 'History of optimizing after each 
commit';
-COMMENT ON COLUMN table_optimizing_process.process_id IS 'Optimizing procedure 
UUID';
-COMMENT ON COLUMN table_optimizing_process.table_id IS 'Table ID';
-COMMENT ON COLUMN table_optimizing_process.catalog_name IS 'Catalog name';
-COMMENT ON COLUMN table_optimizing_process.db_name IS 'Database name';
-COMMENT ON COLUMN table_optimizing_process.table_name IS 'Table name';
-COMMENT ON COLUMN table_optimizing_process.target_snapshot_id IS 'Target 
snapshot ID';
-COMMENT ON COLUMN table_optimizing_process.target_change_snapshot_id IS 
'Target change snapshot ID';
-COMMENT ON COLUMN table_optimizing_process.status IS 'Optimizing status';
-COMMENT ON COLUMN table_optimizing_process.optimizing_type IS 'Optimizing 
type: Major, Minor';
-COMMENT ON COLUMN table_optimizing_process.plan_time IS 'Plan time';
-COMMENT ON COLUMN table_optimizing_process.end_time IS 'Finish time or failed 
time';
-COMMENT ON COLUMN table_optimizing_process.fail_reason IS 'Error message after 
task failure';
-COMMENT ON COLUMN table_optimizing_process.rewrite_input IS 'Rewrite input 
files';
-COMMENT ON COLUMN table_optimizing_process.summary IS 'Summary of optimizing 
tasks';
-COMMENT ON COLUMN table_optimizing_process.from_sequence IS 'From or min 
sequence of each partition';
-COMMENT ON COLUMN table_optimizing_process.to_sequence IS 'To or max sequence 
of each partition';
+CREATE INDEX optimizing_process_state_table_idx ON optimizing_process_state 
(table_id);
+
+COMMENT ON TABLE  optimizing_process_state IS 'History of optimizing after 
each commit';
+COMMENT ON COLUMN optimizing_process_state.process_id                IS 
'optimizing_procedure UUID';
+COMMENT ON COLUMN optimizing_process_state.table_id                  IS 'table 
id';
+COMMENT ON COLUMN optimizing_process_state.target_snapshot_id        IS 
'target snapshot id';
+COMMENT ON COLUMN optimizing_process_state.target_change_snapshot_id IS 
'target change snapshot id';
+COMMENT ON COLUMN optimizing_process_state.rewrite_input             IS 
'rewrite files input';
+COMMENT ON COLUMN optimizing_process_state.from_sequence             IS 'from 
or min sequence of each partition';
+COMMENT ON COLUMN optimizing_process_state.to_sequence               IS 'to or 
max sequence of each partition';
 
 CREATE TABLE task_runtime
 (
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql 
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index ec89d292b..eba85a18d 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -26,3 +26,94 @@ UPDATE table_optimizing_process SET process_id = process_id 
/10 << 13;
 UPDATE task_runtime SET process_id = process_id /10 << 13;
 UPDATE optimizing_task_quota SET process_id = process_id /10 << 13;
 UPDATE table_runtime SET optimizing_process_id = optimizing_process_id /10 << 
13;
+
+CREATE TABLE table_process (
+    process_id      bigserial PRIMARY KEY,
+    table_id        bigint NOT NULL,
+    status          varchar(64) NOT NULL,
+    process_type    varchar(64) NOT NULL,
+    process_stage   varchar(64) NOT NULL,
+    execution_engine varchar(64) NOT NULL,
+    create_time     timestamptz NOT NULL DEFAULT now(),
+    finish_time     timestamptz,
+    fail_message    text CHECK (length(fail_message) <= 4096),
+    summary         text,
+    CONSTRAINT table_process_unique UNIQUE (process_id)
+);
+
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+COMMENT ON TABLE  table_process IS 'History of optimizing after each commit';
+COMMENT ON COLUMN table_process.process_id      IS 'table process id';
+COMMENT ON COLUMN table_process.table_id        IS 'table id';
+COMMENT ON COLUMN table_process.status          IS 'Table optimizing status';
+COMMENT ON COLUMN table_process.process_type    IS 'Process action type';
+COMMENT ON COLUMN table_process.process_stage   IS 'Process current stage';
+COMMENT ON COLUMN table_process.execution_engine IS 'Execution engine';
+COMMENT ON COLUMN table_process.create_time     IS 'First plan time';
+COMMENT ON COLUMN table_process.finish_time     IS 'finish time or failed 
time';
+COMMENT ON COLUMN table_process.fail_message    IS 'Error message after task 
failed';
+COMMENT ON COLUMN table_process.summary         IS 'Max change transaction id 
of these tasks';
+
+CREATE TABLE optimizing_process_state (
+    process_id                bigint PRIMARY KEY,
+    table_id                  bigint NOT NULL,
+    target_snapshot_id        bigint NOT NULL,
+    target_change_snapshot_id bigint NOT NULL,
+    rewrite_input             bytea,
+    from_sequence             text,
+    to_sequence               text
+);
+
+CREATE INDEX optimizing_process_state_table_idx ON optimizing_process_state 
(table_id);
+
+COMMENT ON TABLE  optimizing_process_state IS 'History of optimizing after 
each commit';
+COMMENT ON COLUMN optimizing_process_state.process_id                IS 
'optimizing_procedure UUID';
+COMMENT ON COLUMN optimizing_process_state.table_id                  IS 'table 
id';
+COMMENT ON COLUMN optimizing_process_state.target_snapshot_id        IS 
'target snapshot id';
+COMMENT ON COLUMN optimizing_process_state.target_change_snapshot_id IS 
'target change snapshot id';
+COMMENT ON COLUMN optimizing_process_state.rewrite_input             IS 
'rewrite files input';
+COMMENT ON COLUMN optimizing_process_state.from_sequence             IS 'from 
or min sequence of each partition';
+COMMENT ON COLUMN optimizing_process_state.to_sequence               IS 'to or 
max sequence of each partition';
+
+INSERT INTO table_process
+(process_id, table_id, status, process_type,
+ process_stage, execution_engine, create_time, finish_time, fail_message, 
summary)
+SELECT
+    p.process_id,
+    p.table_id,
+    p.status,
+    p.optimizing_type,
+    CASE t.optimizing_status_code
+        WHEN 700 THEN 'IDLE'
+        WHEN 600 THEN 'PENDING'
+        WHEN 500 THEN 'PLANNING'
+        WHEN 400 THEN 'COMMITTING'
+        WHEN 300 THEN 'MINOR_OPTIMIZING'
+        WHEN 200 THEN 'MAJOR_OPTIMIZING'
+        WHEN 100 THEN 'FULL_OPTIMIZING'
+    END,
+    'AMORO',
+    p.plan_time,
+    p.end_time,
+    p.fail_reason,
+    p.summary
+FROM table_optimizing_process AS p
+JOIN table_runtime AS t
+  ON p.table_id = t.table_id;
+
+INSERT INTO optimizing_process_state
+(process_id, table_id,
+ target_snapshot_id, target_change_snapshot_id,
+ rewrite_input, from_sequence, to_sequence)
+SELECT
+    process_id,
+    table_id,
+    target_snapshot_id,
+    target_change_snapshot_id,
+    rewrite_input,
+    from_sequence,
+    to_sequence
+FROM table_optimizing_process;
+
+DROP TABLE IF EXISTS table_optimizing_process;
\ No newline at end of file
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 6a88748ad..b8ecf8467 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -31,8 +31,9 @@ import 
org.apache.amoro.hive.formats.IcebergHiveCatalogTestHelper;
 import org.apache.amoro.optimizing.MetricsSummary;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.table.DerbyPersistence;
 import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.table.descriptor.FormatTableDescriptor;
@@ -306,17 +307,25 @@ public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor
         Map<String, Long> fromSequence,
         Map<String, Long> toSequence) {
       doAs(
-          OptimizingMapper.class,
+          TableProcessMapper.class,
           mapper ->
-              mapper.insertOptimizingProcess(
-                  identifier,
+              mapper.insertProcess(
+                  identifier.getId(),
                   processId,
-                  targetSnapshotId,
-                  targetChangeSnapshotId,
                   status,
-                  type,
+                  type.name(),
+                  type.name(),
+                  "AMORO",
                   planTime,
-                  summary,
+                  summary.summaryAsMap(false)));
+      doAs(
+          OptimizingProcessMapper.class,
+          mapper ->
+              mapper.insertInternalProcessState(
+                  identifier.getId(),
+                  processId,
+                  targetSnapshotId,
+                  targetChangeSnapshotId,
                   fromSequence,
                   toSequence));
     }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
index a38842bf4..06ae381a5 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
@@ -18,10 +18,14 @@
 
 package org.apache.amoro.server.optimizing;
 
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.optimizing.MetricsSummary;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.amoro.table.TableIdentifier;
 import org.apache.iceberg.data.Record;
@@ -87,45 +91,45 @@ public class BaseOptimizingChecker extends PersistentBase {
   }
 
   protected void assertOptimizingProcess(
-      OptimizingProcessMeta optimizingProcess,
+      TableProcessMeta optimizingProcess,
       OptimizingType optimizeType,
       int fileCntBefore,
       int fileCntAfter) {
     Assert.assertNotNull(optimizingProcess);
-    Assert.assertEquals(optimizeType, optimizingProcess.getOptimizingType());
+    Assert.assertEquals(optimizeType.name(), 
optimizingProcess.getProcessType());
+    MetricsSummary summary = 
MetricsSummary.fromMap(optimizingProcess.getSummary());
     Assert.assertEquals(
         fileCntBefore,
-        optimizingProcess.getSummary().getRewritePosDataFileCnt()
-            + optimizingProcess.getSummary().getRewriteDataFileCnt()
-            + optimizingProcess.getSummary().getEqDeleteFileCnt()
-            + optimizingProcess.getSummary().getPosDeleteFileCnt());
-    Assert.assertEquals(
-        fileCntAfter,
-        optimizingProcess.getSummary().getNewDataFileCnt()
-            + optimizingProcess.getSummary().getNewDeleteFileCnt());
+        summary.getRewritePosDataFileCnt()
+            + summary.getRewriteDataFileCnt()
+            + summary.getEqDeleteFileCnt()
+            + summary.getPosDeleteFileCnt());
+    Assert.assertEquals(fileCntAfter, summary.getNewDataFileCnt() + 
summary.getNewDeleteFileCnt());
   }
 
-  protected OptimizingProcessMeta waitOptimizeResult() {
+  protected TableProcessMeta waitOptimizeResult() {
     boolean success;
+    ServerTableIdentifier identifier =
+        getAs(
+            TableMetaMapper.class,
+            m ->
+                m.selectTableIdentifier(
+                    tableIdentifier.getCatalog(),
+                    tableIdentifier.getDatabase(),
+                    tableIdentifier.getTableName()));
     try {
       success =
           waitUntilFinish(
               () -> {
-                List<OptimizingProcessMeta> tableOptimizingProcesses =
+                List<TableProcessMeta> tableOptimizingProcesses =
                     getAs(
-                        OptimizingMapper.class,
-                        mapper ->
-                            mapper.selectOptimizingProcesses(
-                                tableIdentifier.getCatalog(),
-                                tableIdentifier.getDatabase(),
-                                tableIdentifier.getTableName(),
-                                null,
-                                null));
+                        TableProcessMapper.class,
+                        mapper -> mapper.listProcessMeta(identifier.getId(), 
null, null));
                 if (tableOptimizingProcesses == null || 
tableOptimizingProcesses.isEmpty()) {
                   LOG.info("optimize history is empty");
                   return Status.RUNNING;
                 }
-                Optional<OptimizingProcessMeta> any =
+                Optional<TableProcessMeta> any =
                     tableOptimizingProcesses.stream()
                         .filter(p -> p.getProcessId() > lastProcessId)
                         .filter(p -> 
p.getStatus().equals(ProcessStatus.SUCCESS))
@@ -137,7 +141,7 @@ public class BaseOptimizingChecker extends PersistentBase {
                   LOG.info(
                       "optimize max process id {}",
                       tableOptimizingProcesses.stream()
-                          .map(OptimizingProcessMeta::getProcessId)
+                          .map(TableProcessMeta::getProcessId)
                           .max(Comparator.naturalOrder())
                           .get());
                   return Status.RUNNING;
@@ -150,16 +154,10 @@ public class BaseOptimizingChecker extends PersistentBase 
{
     }
 
     if (success) {
-      List<OptimizingProcessMeta> result =
+      List<TableProcessMeta> result =
           getAs(
-                  OptimizingMapper.class,
-                  mapper ->
-                      mapper.selectOptimizingProcesses(
-                          tableIdentifier.getCatalog(),
-                          tableIdentifier.getDatabase(),
-                          tableIdentifier.getTableName(),
-                          null,
-                          null))
+                  TableProcessMapper.class,
+                  mapper -> mapper.listProcessMeta(identifier.getId(), null, 
null))
               .stream()
               .filter(p -> p.getProcessId() > lastProcessId)
               .filter(p -> p.getStatus().equals(ProcessStatus.SUCCESS))
@@ -181,16 +179,18 @@ public class BaseOptimizingChecker extends PersistentBase 
{
     } catch (InterruptedException e) {
       throw new IllegalStateException("waiting result was interrupted");
     }
-    List<OptimizingProcessMeta> tableOptimizingProcesses =
+    ServerTableIdentifier identifier =
+        getAs(
+            TableMetaMapper.class,
+            m ->
+                m.selectTableIdentifier(
+                    tableIdentifier.getCatalog(),
+                    tableIdentifier.getDatabase(),
+                    tableIdentifier.getTableName()));
+    List<TableProcessMeta> tableOptimizingProcesses =
         getAs(
-                OptimizingMapper.class,
-                mapper ->
-                    mapper.selectOptimizingProcesses(
-                        tableIdentifier.getCatalog(),
-                        tableIdentifier.getDatabase(),
-                        tableIdentifier.getTableName(),
-                        null,
-                        null))
+                TableProcessMapper.class,
+                mapper -> mapper.listProcessMeta(identifier.getId(), null, 
null))
             .stream()
             .filter(p -> p.getProcessId() > lastProcessId)
             .collect(Collectors.toList());
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
index 29ab60d53..3b2bc3b68 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
@@ -24,6 +24,7 @@ import org.apache.amoro.server.AmsEnvironment;
 import org.apache.amoro.server.RestCatalogService;
 import org.apache.amoro.server.catalog.InternalCatalog;
 import org.apache.amoro.server.catalog.ServerCatalog;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -103,7 +104,7 @@ public class TestIcebergHadoopOptimizing extends 
AbstractOptimizingTest {
         partitionData);
 
     // wait Minor Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 
1);
     checker.assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
 
@@ -179,7 +180,7 @@ public class TestIcebergHadoopOptimizing extends 
AbstractOptimizingTest {
         partitionData);
 
     // wait Minor Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 
1);
     assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
 
@@ -221,7 +222,7 @@ public class TestIcebergHadoopOptimizing extends 
AbstractOptimizingTest {
         partitionData);
 
     // wait Minor Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 
1);
     assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
 
@@ -334,7 +335,7 @@ public class TestIcebergHadoopOptimizing extends 
AbstractOptimizingTest {
 
     updateProperties(table, 
TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0");
 
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 9, 
1);
 
     assertIds(readRecords(table), 4, 5);
@@ -368,7 +369,7 @@ public class TestIcebergHadoopOptimizing extends 
AbstractOptimizingTest {
     updateProperties(table, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT, "4");
 
     // wait Minor Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 
3);
     assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
index 8f6dcd662..5555c4802 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
@@ -22,6 +22,7 @@ import org.apache.amoro.hive.table.SupportHive;
 import org.apache.amoro.io.AuthenticatedHadoopFileIO;
 import org.apache.amoro.io.MixedDataTestHelpers;
 import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Streams;
 import org.apache.amoro.table.KeyedTable;
@@ -58,7 +59,7 @@ public class TestMixedHiveOptimizing extends 
AbstractOptimizingTest {
     updateProperties(table, TableProperties.BASE_FILE_INDEX_HASH_BUCKET, 1 + 
"");
     writeBase(table, rangeFromTo(1, 100, "aaa", quickDateWithZone(3)));
     // wait Full Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 1, 
1);
     assertIdRange(readRecords(table), 1, 100);
     // assert file are in hive location
@@ -98,7 +99,7 @@ public class TestMixedHiveOptimizing extends 
AbstractOptimizingTest {
     updateProperties(table, 
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES, false + "");
     writeBase(table, rangeFromTo(1, 100, "aaa", quickDateWithZone(3)));
     // wait Full Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 1, 
1);
     assertIdRange(readRecords(table), 1, 100);
     // assert file are in hive location
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
index 458b5444f..2b3f8e612 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
@@ -19,6 +19,7 @@
 package org.apache.amoro.server.optimizing;
 
 import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.KeyedTable;
 import org.apache.amoro.table.MixedTable;
@@ -58,7 +59,7 @@ public class TestMixedIcebergOptimizing extends 
AbstractOptimizingTest {
         null);
 
     // wait Minor Optimize result, no major optimize because there is only 1 
base file for each node
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 
4);
     assertIds(readRecords(table), 3, 4, 5, 6);
 
@@ -179,7 +180,7 @@ public class TestMixedIcebergOptimizing extends 
AbstractOptimizingTest {
     updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true");
     updateProperties(table, 
TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "1000");
 
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 5, 
4);
     assertIds(
         readRecords(table), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 21, 25, 29);
@@ -240,7 +241,7 @@ public class TestMixedIcebergOptimizing extends 
AbstractOptimizingTest {
             newRecord(9, "hhh", quickDateWithZone(4)),
             newRecord(10, "iii", quickDateWithZone(4))));
     // wait Major Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4, 
2);
     assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10);
 
@@ -281,7 +282,7 @@ public class TestMixedIcebergOptimizing extends 
AbstractOptimizingTest {
             newRecord(9, "hhh", quickDateWithZone(4)),
             newRecord(10, "iii", quickDateWithZone(4))));
     // wait Major Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2, 
1);
     assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10);
 
@@ -333,7 +334,7 @@ public class TestMixedIcebergOptimizing extends 
AbstractOptimizingTest {
     updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true");
 
     // wait Optimize result
-    OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+    TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 1, 
1);
     optimizeHistory = checker.waitOptimizeResult();
     checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6, 
1);
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
index 55cad9784..2bda80015 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
@@ -74,6 +74,43 @@ public class MetricsSummary {
   private int newDeleteFileCnt = 0;
   private long newDeleteRecordCnt = 0;
 
+  public static MetricsSummary fromMap(Map<String, String> summary) {
+    MetricsSummary metricsSummary = new MetricsSummary();
+    metricsSummary.rewriteDataSize = 
Long.parseLong(summary.getOrDefault(INPUT_DATA_SIZE, "0"));
+    metricsSummary.rewriteDataFileCnt =
+        Integer.parseInt(summary.getOrDefault(INPUT_DATA_FILES, "0"));
+    metricsSummary.rewriteDataRecordCnt =
+        Long.parseLong(summary.getOrDefault(INPUT_DATA_RECORDS, "0"));
+    metricsSummary.rewritePosDataSize =
+        Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_SIZE, "0"));
+    metricsSummary.rewritePosDataFileCnt =
+        Integer.parseInt(summary.getOrDefault(INPUT_POS_DELETE_FILES, "0"));
+    metricsSummary.rewritePosDataRecordCnt =
+        Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_RECORDS, "0"));
+    metricsSummary.equalityDeleteSize =
+        Long.parseLong(summary.getOrDefault(INPUT_EQ_DELETE_SIZE, "0"));
+    metricsSummary.eqDeleteFileCnt =
+        Integer.parseInt(summary.getOrDefault(INPUT_EQ_DELETE_FILES, "0"));
+    metricsSummary.eqDeleteRecordCnt =
+        Long.parseLong(summary.getOrDefault(INPUT_EQ_DELETE_RECORDS, "0"));
+    metricsSummary.positionDeleteSize =
+        Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_SIZE, "0"));
+    metricsSummary.posDeleteFileCnt =
+        Integer.parseInt(summary.getOrDefault(INPUT_POS_DELETE_FILES, "0"));
+    metricsSummary.posDeleteRecordCnt =
+        Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_RECORDS, "0"));
+    metricsSummary.newDataSize = 
Long.parseLong(summary.getOrDefault(OUTPUT_DATA_SIZE, "0"));
+    metricsSummary.newDataFileCnt = 
Integer.parseInt(summary.getOrDefault(OUTPUT_DATA_FILES, "0"));
+    metricsSummary.newDataRecordCnt =
+        Long.parseLong(summary.getOrDefault(OUTPUT_DATA_RECORDS, "0"));
+    metricsSummary.newDeleteSize = 
Long.parseLong(summary.getOrDefault(OUTPUT_DELETE_SIZE, "0"));
+    metricsSummary.newDeleteFileCnt =
+        Integer.parseInt(summary.getOrDefault(OUTPUT_DELETE_FILES, "0"));
+    metricsSummary.newDeleteRecordCnt =
+        Long.parseLong(summary.getOrDefault(OUTPUT_DELETE_RECORDS, "0"));
+    return metricsSummary;
+  }
+
   public MetricsSummary() {}
 
   protected MetricsSummary(RewriteFilesInput input) {


Reply via email to