This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new d97597ffb [AMORO-3066] Optimizing the efficiency for 
optimizing-processes rest api (#3257)
d97597ffb is described below

commit d97597ffbda8adde13f6d679b8f8eeaa795184b4
Author: Congxian Qiu <[email protected]>
AuthorDate: Fri Oct 18 15:09:40 2024 +0800

    [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api 
(#3257)
    
    * [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
    
    * fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes 
rest api
---
 amoro-ams/pom.xml                                  |   6 +
 .../dashboard/MixedAndIcebergTableDescriptor.java  |  56 ++--
 .../dashboard/controller/TableController.java      |   8 +-
 .../amoro/server/optimizing/OptimizingProcess.java |   4 +-
 .../amoro/server/optimizing/OptimizingQueue.java   |  22 +-
 .../persistence/SqlSessionFactoryProvider.java     |  21 ++
 .../persistence/mapper/OptimizingMapper.java       |  20 +-
 .../executor/TableRuntimeRefreshExecutor.java      |   4 +-
 .../amoro/server/TestDefaultOptimizingService.java |   4 +-
 .../TestIcebergServerTableDescriptor.java          | 309 +++++++++++++++++++++
 .../server/optimizing/BaseOptimizingChecker.java   |  20 +-
 .../server/optimizing/TestOptimizingQueue.java     |   7 +-
 pom.xml                                            |   1 +
 13 files changed, 427 insertions(+), 55 deletions(-)

diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml
index 1db3a8f81..e731558fd 100644
--- a/amoro-ams/pom.xml
+++ b/amoro-ams/pom.xml
@@ -394,6 +394,12 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.pagehelper</groupId>
+            <artifactId>pagehelper</artifactId>
+            <version>${pagehelper.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-data</artifactId>
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 6d499259a..792a9170d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -18,6 +18,9 @@
 
 package org.apache.amoro.server.dashboard;
 
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.api.CommitMetaProducer;
@@ -65,7 +68,6 @@ import org.apache.amoro.table.descriptor.TagOrBranchInfo;
 import org.apache.amoro.utils.MixedDataFiles;
 import org.apache.amoro.utils.MixedTableUtil;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.HasTableOperations;
@@ -505,29 +507,34 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
   public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
       AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, 
int offset) {
     TableIdentifier tableIdentifier = amoroTable.id();
-    List<OptimizingProcessMeta> processMetaList =
-        getAs(
-            OptimizingMapper.class,
-            mapper ->
-                mapper.selectOptimizingProcesses(
-                    tableIdentifier.getCatalog(),
-                    tableIdentifier.getDatabase(),
-                    tableIdentifier.getTableName()));
-
-    processMetaList =
-        processMetaList.stream()
-            .filter(
-                p ->
-                    StringUtils.isBlank(type)
-                        || 
type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
-            .filter(p -> status == null || 
status.name().equalsIgnoreCase(p.getStatus().name()))
-            .collect(Collectors.toList());
-
-    int total = processMetaList.size();
-    processMetaList =
-        
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
-    if (CollectionUtils.isEmpty(processMetaList)) {
-      return Pair.of(Collections.emptyList(), 0);
+    int total = 0;
+    // page helper is 1-based
+    int pageNumber = (offset / limit) + 1;
+    List<OptimizingProcessMeta> processMetaList = Collections.emptyList();
+    try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
+      processMetaList =
+          getAs(
+              OptimizingMapper.class,
+              mapper ->
+                  mapper.selectOptimizingProcesses(
+                      tableIdentifier.getCatalog(),
+                      tableIdentifier.getDatabase(),
+                      tableIdentifier.getTableName(),
+                      type,
+                      status,
+                      offset,
+                      limit));
+      PageInfo<OptimizingProcessMeta> pageInfo = new 
PageInfo<>(processMetaList);
+      total = (int) pageInfo.getTotal();
+      LOG.info(
+          "Get optimizing processes total : {} , pageNumber:{}, limit:{}, 
offset:{}",
+          total,
+          pageNumber,
+          limit,
+          offset);
+      if (pageInfo.getSize() == 0) {
+        return Pair.of(Collections.emptyList(), 0);
+      }
     }
     List<Long> processIds =
         processMetaList.stream()
@@ -537,6 +544,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
         getAs(OptimizingMapper.class, mapper -> 
mapper.selectOptimizeTaskMetas(processIds)).stream()
             .collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));
 
+    LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
     return Pair.of(
         processMetaList.stream()
             .map(p -> buildOptimizingProcessInfo(p, 
optimizingTasks.get(p.getProcessId())))
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index 9db6f074d..e00984084 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -310,16 +310,20 @@ public class TableController {
     String db = ctx.pathParam("db");
     String table = ctx.pathParam("table");
     String type = ctx.queryParam("type");
+
+    if (StringUtils.isBlank(type)) {
+      // treat all blank string to null
+      type = null;
+    }
+
     String status = ctx.queryParam("status");
     Integer page = ctx.queryParamAsClass("page", 
Integer.class).getOrDefault(1);
     Integer pageSize = ctx.queryParamAsClass("pageSize", 
Integer.class).getOrDefault(20);
 
     int offset = (page - 1) * pageSize;
     int limit = pageSize;
-    ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
     Preconditions.checkArgument(offset >= 0, "offset[%s] must >= 0", offset);
     Preconditions.checkArgument(limit >= 0, "limit[%s] must >= 0", limit);
-    Preconditions.checkState(serverCatalog.tableExists(db, table), "no such 
table");
 
     TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
     ProcessStatus processStatus =
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
index 5e0f47447..d8e0f58a4 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
@@ -18,6 +18,8 @@
 
 package org.apache.amoro.server.optimizing;
 
+import org.apache.amoro.process.ProcessStatus;
+
 public interface OptimizingProcess {
 
   long getProcessId();
@@ -36,7 +38,7 @@ public interface OptimizingProcess {
 
   OptimizingType getOptimizingType();
 
-  Status getStatus();
+  ProcessStatus getStatus();
 
   long getRunningQuotaTime(long calculatingStartTime, long calculatingEndTime);
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ccbe9ccd5..710ec729d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -24,6 +24,7 @@ import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.api.OptimizingTaskId;
 import org.apache.amoro.exception.OptimizingClosedException;
 import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.manager.MetricManager;
@@ -354,7 +355,7 @@ public class OptimizingQueue extends PersistentBase {
     private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap 
= Maps.newHashMap();
     private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new 
LinkedList<>();
     private final Lock lock = new ReentrantLock();
-    private volatile Status status = OptimizingProcess.Status.RUNNING;
+    private volatile ProcessStatus status = ProcessStatus.RUNNING;
     private volatile String failedReason;
     private long endTime = AmoroServiceConstants.INVALID_TIME;
     private Map<String, Long> fromSequence = Maps.newHashMap();
@@ -396,7 +397,7 @@ public class OptimizingQueue extends PersistentBase {
       if (tableRuntime.getToSequence() != null) {
         toSequence = tableRuntime.getToSequence();
       }
-      if (this.status != OptimizingProcess.Status.CLOSED) {
+      if (this.status != ProcessStatus.CLOSED) {
         tableRuntime.recover(this);
       }
       loadTaskRuntimes(this);
@@ -413,7 +414,7 @@ public class OptimizingQueue extends PersistentBase {
     }
 
     @Override
-    public Status getStatus() {
+    public ProcessStatus getStatus() {
       return status;
     }
 
@@ -421,10 +422,10 @@ public class OptimizingQueue extends PersistentBase {
     public void close() {
       lock.lock();
       try {
-        if (this.status != Status.RUNNING) {
+        if (this.status != ProcessStatus.RUNNING) {
           return;
         }
-        this.status = OptimizingProcess.Status.CLOSED;
+        this.status = ProcessStatus.CLOSED;
         this.endTime = System.currentTimeMillis();
         persistProcessCompleted(false);
         clearProcess(this);
@@ -468,7 +469,7 @@ public class OptimizingQueue extends PersistentBase {
           } else {
             clearProcess(this);
             this.failedReason = taskRuntime.getFailReason();
-            this.status = OptimizingProcess.Status.FAILED;
+            this.status = ProcessStatus.FAILED;
             this.endTime = taskRuntime.getEndTime();
             persistProcessCompleted(false);
           }
@@ -481,15 +482,14 @@ public class OptimizingQueue extends PersistentBase {
     // the cleanup of task should be done after unlock to avoid deadlock
     @Override
     public void releaseResourcesIfNecessary() {
-      if (this.status == OptimizingProcess.Status.FAILED
-          || this.status == OptimizingProcess.Status.CLOSED) {
+      if (this.status == ProcessStatus.FAILED || this.status == 
ProcessStatus.CLOSED) {
         cancelTasks();
       }
     }
 
     @Override
     public boolean isClosed() {
-      return status == OptimizingProcess.Status.CLOSED;
+      return status == ProcessStatus.CLOSED;
     }
 
     @Override
@@ -566,12 +566,12 @@ public class OptimizingQueue extends PersistentBase {
         try {
           hasCommitted = true;
           buildCommit().commit();
-          status = Status.SUCCESS;
+          status = ProcessStatus.SUCCESS;
           endTime = System.currentTimeMillis();
           persistProcessCompleted(true);
         } catch (Exception e) {
           LOG.error("{} Commit optimizing failed ", 
tableRuntime.getTableIdentifier(), e);
-          status = Status.FAILED;
+          status = ProcessStatus.FAILED;
           failedReason = ExceptionUtil.getErrorMessage(e, 4000);
           endTime = System.currentTimeMillis();
           persistProcessCompleted(false);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 4560e5a4e..14f4b8fdf 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -18,6 +18,12 @@
 
 package org.apache.amoro.server.persistence;
 
+import static com.github.pagehelper.page.PageAutoDialect.registerDialectAlias;
+
+import com.github.pagehelper.PageInterceptor;
+import com.github.pagehelper.dialect.helper.MySqlDialect;
+import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
+import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.server.AmoroManagementConf;
 import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
@@ -78,6 +84,9 @@ public class SqlSessionFactoryProvider {
   private volatile SqlSessionFactory sqlSessionFactory;
 
   public void init(Configurations config) throws SQLException {
+
+    registerDialectAliases();
+
     BasicDataSource dataSource = new BasicDataSource();
     dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
     
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
@@ -116,6 +125,12 @@ public class SqlSessionFactoryProvider {
     configuration.addMapper(ResourceMapper.class);
     configuration.addMapper(TableBlockerMapper.class);
 
+    PageInterceptor interceptor = new PageInterceptor();
+    Properties interceptorProperties = new Properties();
+    interceptorProperties.setProperty("reasonable", "false");
+    interceptor.setProperties(interceptorProperties);
+    configuration.addInterceptor(interceptor);
+
     DatabaseIdProvider provider = new VendorDatabaseIdProvider();
     Properties properties = new Properties();
     properties.setProperty("MySQL", "mysql");
@@ -133,6 +148,12 @@ public class SqlSessionFactoryProvider {
     createTablesIfNeed(config);
   }
 
+  private void registerDialectAliases() {
+    registerDialectAlias("postgres", PostgreSqlDialect.class);
+    registerDialectAlias("mysql", MySqlDialect.class);
+    registerDialectAlias("derby", SqlServer2012Dialect.class);
+  }
+
   /**
    * create tables for database
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
index 817e7cfd2..7de47f5b5 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
@@ -20,8 +20,8 @@ package org.apache.amoro.server.persistence.mapper;
 
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.optimizing.MetricsSummary;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
 import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
 import org.apache.amoro.server.optimizing.OptimizingType;
@@ -69,7 +69,7 @@ public interface OptimizingMapper {
       @Param("processId") long processId,
       @Param("targetSnapshotId") long targetSnapshotId,
       @Param("targetChangeSnapshotId") long targetChangeSnapshotId,
-      @Param("status") OptimizingProcess.Status status,
+      @Param("status") ProcessStatus status,
       @Param("optimizingType") OptimizingType optimizingType,
       @Param("planTime") long planTime,
       @Param("summary") MetricsSummary summary,
@@ -85,19 +85,23 @@ public interface OptimizingMapper {
   void updateOptimizingProcess(
       @Param("tableId") long tableId,
       @Param("processId") long processId,
-      @Param("optimizingStatus") OptimizingProcess.Status status,
+      @Param("optimizingStatus") ProcessStatus status,
       @Param("endTime") long endTime,
       @Param("summary") MetricsSummary summary,
       @Param("failedReason") String failedReason);
 
   @Select(
-      "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, 
a.table_name, a.target_snapshot_id,"
+      "<script>"
+          + "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, 
a.table_name, a.target_snapshot_id,"
           + " a.target_change_snapshot_id, a.status, a.optimizing_type, 
a.plan_time, a.end_time,"
           + " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM 
table_optimizing_process a"
           + " INNER JOIN table_identifier b ON a.table_id = b.table_id"
           + " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName} 
AND a.table_name = #{tableName}"
           + " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName} 
AND b.table_name = #{tableName}"
-          + " ORDER BY process_id desc")
+          + " <if test='optimizingType != null'> AND a.optimizing_type = 
#{optimizingType}</if>"
+          + " <if test='optimizingStatus != null'> AND a.status = 
#{optimizingStatus}</if>"
+          + " ORDER BY process_id desc"
+          + "</script>")
   @Results({
     @Result(property = "processId", column = "process_id"),
     @Result(property = "tableId", column = "table_id"),
@@ -124,7 +128,11 @@ public interface OptimizingMapper {
   List<OptimizingProcessMeta> selectOptimizingProcesses(
       @Param("catalogName") String catalogName,
       @Param("dbName") String dbName,
-      @Param("tableName") String tableName);
+      @Param("tableName") String tableName,
+      @Param("optimizingType") String optimizingType,
+      @Param("optimizingStatus") ProcessStatus optimizingStatus,
+      @Param("pageNum") int pageNum,
+      @Param("pageSize") int pageSize);
 
   /** Optimizing TaskRuntime operation below */
   @Insert({
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index e613027f0..b290323ab 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.table.executor;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
 import org.apache.amoro.server.table.TableManager;
@@ -73,8 +74,7 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
     if (originalConfig.getOptimizingConfig().isEnabled()
         && 
!tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
       OptimizingProcess optimizingProcess = 
tableRuntime.getOptimizingProcess();
-      if (optimizingProcess != null
-          && optimizingProcess.getStatus() == 
OptimizingProcess.Status.RUNNING) {
+      if (optimizingProcess != null && optimizingProcess.getStatus() == 
ProcessStatus.RUNNING) {
         optimizingProcess.close();
       }
     }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 99b950a2b..0c15cbf6f 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -33,7 +33,7 @@ import org.apache.amoro.exception.PluginRetryAuthException;
 import org.apache.amoro.io.MixedDataTestHelpers;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.TableOptimizing;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
+import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.resource.OptimizerInstance;
@@ -386,7 +386,7 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     Assertions.assertEquals(
         0, 
optimizingService().listTasks(defaultResourceGroup().getName()).size());
     Assertions.assertEquals(
-        OptimizingProcess.Status.RUNNING,
+        ProcessStatus.RUNNING,
         tableService()
             .getRuntime(serverTableIdentifier().getId())
             .getOptimizingProcess()
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 7bd401154..356d886b0 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -18,18 +18,55 @@
 
 package org.apache.amoro.server.dashboard;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.Configurations;
 import org.apache.amoro.formats.AmoroCatalogTestHelper;
 import org.apache.amoro.formats.IcebergHadoopCatalogTestHelper;
+import org.apache.amoro.formats.iceberg.IcebergTable;
 import org.apache.amoro.hive.formats.IcebergHiveCatalogTestHelper;
+import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.optimizing.MetricsSummary;
+import org.apache.amoro.server.optimizing.OptimizingType;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.table.descriptor.FormatTableDescriptor;
+import org.apache.amoro.table.descriptor.OptimizingProcessInfo;
 import org.apache.amoro.table.descriptor.TestServerTableDescriptor;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.ibatis.session.SqlSession;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 @RunWith(Parameterized.class)
 public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor {
+  Persistency persistency = null;
 
   public TestIcebergServerTableDescriptor(AmoroCatalogTestHelper<?> 
amoroCatalogTestHelper) {
     super(amoroCatalogTestHelper);
@@ -51,6 +88,189 @@ public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor
         .commit();
   }
 
+  @After
+  public void after() throws IOException {
+    if (persistency != null) {
+      persistency.truncateAllTables();
+    }
+    super.after();
+  }
+
+  @Test
+  public void testOptimizingPorcess() {
+    Persistency persistency = new Persistency();
+
+    String catalogName = "catalog1";
+    String dbName = "db1";
+    String tableName = "table1";
+
+    ServerTableIdentifier identifier =
+        ServerTableIdentifier.of(1L, catalogName, dbName, tableName, 
TableFormat.ICEBERG);
+    persistency.insertTable(identifier);
+    MetricsSummary dummySummery = new MetricsSummary();
+    dummySummery.setNewDeleteFileCnt(1);
+    dummySummery.setNewFileCnt(1);
+    dummySummery.setNewFileSize(1);
+    dummySummery.setNewDeleteFileCnt(1);
+    dummySummery.setNewDeleteSize(1);
+    persistency.insertOptimizingProcess(
+        identifier,
+        1L,
+        1,
+        1,
+        ProcessStatus.SUCCESS,
+        OptimizingType.MAJOR,
+        1L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        2L,
+        2L,
+        2L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.MINOR,
+        2L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        3L,
+        3L,
+        3L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.FULL,
+        3L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        4L,
+        4L,
+        4L,
+        ProcessStatus.FAILED,
+        OptimizingType.MAJOR,
+        4L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        5L,
+        5L,
+        5L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.MINOR,
+        5L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        6L,
+        6L,
+        6L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.FULL,
+        6L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        7L,
+        7L,
+        7L,
+        ProcessStatus.FAILED,
+        OptimizingType.MAJOR,
+        7L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        8L,
+        8L,
+        8L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.MINOR,
+        8L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        9L,
+        9L,
+        9L,
+        ProcessStatus.FAILED,
+        OptimizingType.FULL,
+        9L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    persistency.insertOptimizingProcess(
+        identifier,
+        10L,
+        10L,
+        10L,
+        ProcessStatus.SUCCESS,
+        OptimizingType.MINOR,
+        10L,
+        dummySummery,
+        Collections.emptyMap(),
+        Collections.emptyMap());
+
+    AmoroTable<?> table = mock(IcebergTable.class);
+    TableIdentifier tableIdentifier =
+        TableIdentifier.of(
+            identifier.getCatalog(), identifier.getDatabase(), 
identifier.getTableName());
+    doReturn(tableIdentifier).when(table).id();
+
+    Pair<List<OptimizingProcessInfo>, Integer> res =
+        persistency.getOptimizingProcessesInfo(table, null, null, 4, 4);
+    Integer expectResturnItemSizeForNoTypeNoStatusOffset0Limit5 = 4;
+    Integer expectTotalForNoTypeNoStatusOffset0Limit5 = 10;
+    Assert.assertEquals(
+        expectResturnItemSizeForNoTypeNoStatusOffset0Limit5, (Integer) 
res.getLeft().size());
+    Assert.assertEquals(expectTotalForNoTypeNoStatusOffset0Limit5, 
res.getRight());
+
+    res = persistency.getOptimizingProcessesInfo(table, null, 
ProcessStatus.SUCCESS, 5, 0);
+    Integer expectReturnItemSizeForOnlyStatusOffset0limit5 = 5;
+    Integer expectedTotalForOnlyStatusOffset0Limit5 = 7;
+    Assert.assertEquals(
+        expectReturnItemSizeForOnlyStatusOffset0limit5, (Integer) 
res.getLeft().size());
+    Assert.assertEquals(expectedTotalForOnlyStatusOffset0Limit5, 
res.getRight());
+
+    res = persistency.getOptimizingProcessesInfo(table, 
OptimizingType.MINOR.name(), null, 5, 0);
+    Integer expectedRetItemsSizeForOnlyTypeOffset0Limit5 = 4;
+    Integer expectedRetTotalForOnlyTypeOffset0Limit5 = 4;
+    Assert.assertEquals(
+        expectedRetItemsSizeForOnlyTypeOffset0Limit5, (Integer) 
res.getLeft().size());
+    Assert.assertEquals(expectedRetTotalForOnlyTypeOffset0Limit5, 
res.getRight());
+
+    res =
+        persistency.getOptimizingProcessesInfo(
+            table, OptimizingType.MINOR.name(), ProcessStatus.SUCCESS, 2, 2);
+    Integer expectedRetItemSizeForBothTypeAndStatusOffset2Limit2 = 2;
+    Integer expectedRetTotalForBothTypeAndStatusOffset2Limit2 = 4;
+    Assert.assertEquals(
+        expectedRetItemSizeForBothTypeAndStatusOffset2Limit2, (Integer) 
res.getLeft().size());
+    Assert.assertEquals(expectedRetTotalForBothTypeAndStatusOffset2Limit2, 
res.getRight());
+  }
+
   @Override
   protected void tableOperationsRenameColumns() {
     getTable().updateSchema().renameColumn("new_col", "renamed_col").commit();
@@ -87,4 +307,93 @@ public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor
   private Table getTable() {
     return (Table) getAmoroCatalog().loadTable(TEST_DB, 
TEST_TABLE).originalTable();
   }
+
+  /**
+   * Test persistence class used to test MixedAndIcebergTableDescriptor, it 
will use derby as the
+   * underly db.
+   */
+  private static class Persistency extends MixedAndIcebergTableDescriptor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DerbyPersistence.class);
+
+    private static TemporaryFolder SINGLETON_FOLDER;
+
+    Persistency() {
+      try {
+        SINGLETON_FOLDER = new TemporaryFolder();
+        SINGLETON_FOLDER.create();
+        String derbyFilePath = SINGLETON_FOLDER.newFolder("derby").getPath();
+        String derbyUrl = String.format("jdbc:derby:%s/derby;create=true", 
derbyFilePath);
+        Configurations configurations = new Configurations();
+        configurations.set(AmoroManagementConf.DB_CONNECTION_URL, derbyUrl);
+        configurations.set(AmoroManagementConf.DB_TYPE, 
AmoroManagementConf.DB_TYPE_DERBY);
+        configurations.set(
+            AmoroManagementConf.DB_DRIVER_CLASS_NAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+        SqlSessionFactoryProvider.getInstance().init(configurations);
+        LOG.info("Initialized derby persistent with url: {}", derbyUrl);
+        Runtime.getRuntime()
+            .addShutdownHook(
+                new Thread(
+                    () -> {
+                      SINGLETON_FOLDER.delete();
+                      LOG.info("Deleted resources in derby persistent.");
+                    }));
+        truncateAllTables();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private void truncateAllTables() {
+      try (SqlSession sqlSession =
+          SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+        try (Connection connection = sqlSession.getConnection()) {
+          try (Statement statement = connection.createStatement()) {
+            String query = "SELECT TABLENAME FROM SYS.SYSTABLES WHERE 
TABLETYPE='T'";
+            List<String> tableList = Lists.newArrayList();
+            try (ResultSet rs = statement.executeQuery(query)) {
+              while (rs.next()) {
+                tableList.add(rs.getString(1));
+              }
+            }
+            for (String table : tableList) {
+              statement.execute("TRUNCATE TABLE " + table);
+            }
+          }
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException("Clear table failed", e);
+      }
+    }
+
+    public void insertTable(ServerTableIdentifier identifier) {
+      doAs(TableMetaMapper.class, mapper -> mapper.insertTable(identifier));
+    }
+
+    public void insertOptimizingProcess(
+        ServerTableIdentifier identifier,
+        long processId,
+        long targetSnapshotId,
+        long targetChangeSnapshotId,
+        ProcessStatus status,
+        OptimizingType type,
+        long planTime,
+        MetricsSummary summary,
+        Map<String, Long> fromSequence,
+        Map<String, Long> toSequence) {
+      doAs(
+          OptimizingMapper.class,
+          mapper ->
+              mapper.insertOptimizingProcess(
+                  identifier,
+                  processId,
+                  targetSnapshotId,
+                  targetChangeSnapshotId,
+                  status,
+                  type,
+                  planTime,
+                  summary,
+                  fromSequence,
+                  toSequence));
+    }
+  }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
index 965e7d43b..f4acb9913 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
@@ -116,7 +116,11 @@ public class BaseOptimizingChecker extends PersistentBase {
                             mapper.selectOptimizingProcesses(
                                 tableIdentifier.getCatalog(),
                                 tableIdentifier.getDatabase(),
-                                tableIdentifier.getTableName()));
+                                tableIdentifier.getTableName(),
+                                null,
+                                null,
+                                0,
+                                Integer.MAX_VALUE));
                 if (tableOptimizingProcesses == null || 
tableOptimizingProcesses.isEmpty()) {
                   LOG.info("optimize history is empty");
                   return Status.RUNNING;
@@ -153,7 +157,11 @@ public class BaseOptimizingChecker extends PersistentBase {
                       mapper.selectOptimizingProcesses(
                           tableIdentifier.getCatalog(),
                           tableIdentifier.getDatabase(),
-                          tableIdentifier.getTableName()))
+                          tableIdentifier.getTableName(),
+                          null,
+                          null,
+                          0,
+                          Integer.MAX_VALUE))
               .stream()
               .filter(p -> p.getProcessId() > lastProcessId)
               .filter(p -> 
p.getStatus().equals(OptimizingProcess.Status.SUCCESS))
@@ -182,11 +190,15 @@ public class BaseOptimizingChecker extends PersistentBase 
{
                     mapper.selectOptimizingProcesses(
                         tableIdentifier.getCatalog(),
                         tableIdentifier.getDatabase(),
-                        tableIdentifier.getTableName()))
+                        tableIdentifier.getTableName(),
+                        null,
+                        null,
+                        0,
+                        Integer.MAX_VALUE))
             .stream()
             .filter(p -> p.getProcessId() > lastProcessId)
             .collect(Collectors.toList());
-    Assert.assertFalse("optimize is not stopped", 
tableOptimizingProcesses.size() > 0);
+    Assert.assertTrue("optimize is not stopped", 
tableOptimizingProcesses.isEmpty());
   }
 
   protected boolean waitUntilFinish(Supplier<Status> statusSupplier, final 
long timeout)
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 24ce7c036..bf668fbcf 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -43,6 +43,7 @@ import org.apache.amoro.metrics.Gauge;
 import org.apache.amoro.metrics.MetricKey;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.metrics.MetricRegistry;
@@ -219,14 +220,14 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
 
     // 7.commit
     OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
-    Assert.assertEquals(OptimizingProcess.Status.RUNNING, 
optimizingProcess.getStatus());
+    Assert.assertEquals(ProcessStatus.RUNNING, optimizingProcess.getStatus());
     optimizingProcess.commit();
-    Assert.assertEquals(OptimizingProcess.Status.SUCCESS, 
optimizingProcess.getStatus());
+    Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
     Assert.assertNull(tableRuntime.getOptimizingProcess());
 
     // 8.commit again, throw exceptions, and status not changed.
     Assert.assertThrows(IllegalStateException.class, 
optimizingProcess::commit);
-    Assert.assertEquals(OptimizingProcess.Status.SUCCESS, 
optimizingProcess.getStatus());
+    Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
 
     Assert.assertEquals(0, queue.collectTasks().size());
     queue.dispose();
diff --git a/pom.xml b/pom.xml
index 2b7290b22..173d1d3ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
         <annotation-api.version>1.3.2</annotation-api.version>
         <guava.version>32.1.1-jre</guava.version>
         <hudi.version>0.14.1</hudi.version>
+        <pagehelper.version>6.1.0</pagehelper.version>
 
         <rocksdb-dependency-scope>compile</rocksdb-dependency-scope>
         <lucene-dependency-scope>compile</lucene-dependency-scope>


Reply via email to