This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 24705a529 Save the last completion time for each cleanup operation 
performed on each optimization table. (#3802)
24705a529 is described below

commit 24705a5290ba32f0cf3e1f87761b0f222b77fb20
Author: zhangwl9 <[email protected]>
AuthorDate: Thu Nov 20 20:29:54 2025 +0800

    Save the last completion time for each cleanup operation performed on each 
optimization table. (#3802)
    
    * Save the last completion time for each cleanup operation performed on 
each optimization table.
    
    # Conflicts:
    #       amoro-ams/src/main/resources/mysql/upgrade.sql
    
    * Store the execution time of each cleanup operation for the optimization 
table in the table_runtime_state table.
    
    * fixup style
    
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
---
 .../server/scheduler/PeriodicTableScheduler.java   | 130 +++++++++-
 .../DanglingDeleteFilesCleaningExecutor.java       |  11 +
 .../scheduler/inline/DataExpiringExecutor.java     |  11 +
 .../inline/OrphanFilesCleaningExecutor.java        |  11 +
 .../inline/SnapshotsExpiringExecutor.java          |  11 +
 .../amoro/server/table/DefaultTableRuntime.java    |  51 +++-
 .../server/table/cleanup/CleanupOperation.java     |  30 +++
 .../table/cleanup/TableRuntimeCleanupState.java    |  58 +++++
 .../src/main/resources/mysql/ams-mysql-init.sql    |   2 -
 .../main/resources/postgres/ams-postgres-init.sql  |   1 -
 .../inline/PeriodicTableSchedulerTestBase.java     |  91 +++++++
 .../inline/TestPeriodicTableSchedulerCleanup.java  | 265 +++++++++++++++++++++
 12 files changed, 666 insertions(+), 6 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index 5c8554273..ef7419325 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -25,8 +25,10 @@ import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.RuntimeHandlerChain;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -84,18 +86,49 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
         .forEach(
             tableRuntime -> {
               if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
-                executor.schedule(
-                    () -> executeTask(tableRuntime), getStartDelay(), 
TimeUnit.MILLISECONDS);
+                scheduleTableExecution(
+                    tableRuntime, calculateExecutionDelay(tableRuntime, 
getCleanupOperation()));
               }
             });
 
     logger.info("Table executor {} initialized", getClass().getSimpleName());
   }
 
+  private long calculateExecutionDelay(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+    // If the table needs to be executed immediately, schedule it to run after 
a short delay.
+    if (shouldExecuteTask(tableRuntime, cleanupOperation)) {
+      return getStartDelay();
+    }
+
+    // If the table does not need to be executed immediately, schedule it for 
the next execution
+    // time.
+    // Adding getStartDelay() helps distribute the execution time of multiple 
tables,
+    // reducing the probability of simultaneous execution and system load 
spikes.
+    return getNextExecutingTime(tableRuntime) + getStartDelay();
+  }
+
+  /**
+   * Schedule a table for execution with the specified delay.
+   *
+   * @param tableRuntime The table runtime to schedule
+   * @param delay The delay in milliseconds before execution
+   */
+  private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
+    executor.schedule(() -> executeTask(tableRuntime), delay, 
TimeUnit.MILLISECONDS);
+    logger.debug(
+        "Scheduled execution for table {} with delay {} ms",
+        tableRuntime.getTableIdentifier(),
+        delay);
+  }
+
   private void executeTask(TableRuntime tableRuntime) {
     try {
       if (isExecutable(tableRuntime)) {
         execute(tableRuntime);
+        // Different tables take different amounts of time to execute the end 
of execute(),
+        // so you need to perform the update operation separately for each 
table.
+        persistUpdatingCleanupTime(tableRuntime);
       }
     } finally {
       scheduledTables.remove(tableRuntime.getTableIdentifier());
@@ -117,6 +150,99 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
 
   protected abstract void execute(TableRuntime tableRuntime);
 
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    return true;
+  }
+
+  private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
+    CleanupOperation cleanupOperation = getCleanupOperation();
+    if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+      return;
+    }
+
+    try {
+      long currentTime = System.currentTimeMillis();
+      ((DefaultTableRuntime) 
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+
+      logger.debug(
+          "Update lastCleanTime for table {} with cleanup operation {}",
+          tableRuntime.getTableIdentifier().getTableName(),
+          cleanupOperation);
+    } catch (Exception e) {
+      logger.error(
+          "Failed to update lastCleanTime for table {}",
+          tableRuntime.getTableIdentifier().getTableName(),
+          e);
+    }
+  }
+
+  /**
+   * Get cleanup operation. Default is NONE, subclasses should override this 
method to provide
+   * specific operation.
+   *
+   * @return cleanup operation
+   */
+  protected CleanupOperation getCleanupOperation() {
+    return CleanupOperation.NONE;
+  }
+
+  protected boolean shouldExecuteTask(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+    if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+      return true;
+    }
+
+    long lastCleanupEndTime =
+        ((DefaultTableRuntime) 
tableRuntime).getLastCleanTime(cleanupOperation);
+
+    // If it's zero, execute the task
+    if (lastCleanupEndTime == 0L) {
+      logger.debug(
+          "LastCleanupTime for table {} with operation {} is not exist, 
executing task",
+          tableRuntime.getTableIdentifier().getTableName(),
+          cleanupOperation);
+      return true;
+    }
+
+    // After ams restarts, certain cleanup operations can only be re-executed
+    // if sufficient time has elapsed since the last cleanup.
+    boolean result = shouldExecute(lastCleanupEndTime);
+    logger.debug(
+        result
+            ? "Should execute task for table {} with {}"
+            : "Not enough time has passed since last cleanup for table {} with 
{}, delaying execution",
+        tableRuntime.getTableIdentifier().getTableName(),
+        cleanupOperation);
+
+    return result;
+  }
+
+  /**
+   * Check if the operation should be skipped based on common conditions.
+   *
+   * @param tableRuntime the table runtime to check
+   * @param cleanupOperation the cleanup operation to perform
+   * @return true if the operation should be skipped, false otherwise
+   */
+  private boolean shouldSkipOperation(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+    if (cleanupOperation == CleanupOperation.NONE) {
+      logger.debug(
+          "No cleanup operation specified, skipping cleanup time check for 
table {}",
+          tableRuntime.getTableIdentifier().getTableName());
+      return true;
+    }
+
+    if (!(tableRuntime instanceof DefaultTableRuntime)) {
+      logger.debug(
+          "Table runtime is not DefaultTableRuntime, skipping cleanup time 
check for table {}",
+          tableRuntime.getTableIdentifier().getTableName());
+      return true;
+    }
+
+    return false;
+  }
+
   protected String getThreadName() {
     return String.join("-", 
StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
         .toLowerCase(Locale.ROOT);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index 16f80c9c0..d7d8801f1 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -26,6 +26,7 @@ import 
org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,16 @@ public class DanglingDeleteFilesCleaningExecutor extends 
PeriodicTableScheduler
     return INTERVAL;
   }
 
+  @Override
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
+  }
+
+  @Override
+  protected CleanupOperation getCleanupOperation() {
+    return CleanupOperation.DANGLING_DELETE_FILES_CLEANING;
+  }
+
   @Override
   protected boolean enabled(TableRuntime tableRuntime) {
     return tableRuntime instanceof DefaultTableRuntime
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 4990b7409..61f45860b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,16 @@ public class DataExpiringExecutor extends 
PeriodicTableScheduler {
     return interval.toMillis();
   }
 
+  @Override
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    return System.currentTimeMillis() - lastCleanupEndTime >= 
interval.toMillis();
+  }
+
+  @Override
+  protected CleanupOperation getCleanupOperation() {
+    return CleanupOperation.DATA_EXPIRING;
+  }
+
   @Override
   protected boolean enabled(TableRuntime tableRuntime) {
     return 
tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index 332c51141..21d60cd10 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,16 @@ public class OrphanFilesCleaningExecutor extends 
PeriodicTableScheduler {
     return interval.toMillis();
   }
 
+  @Override
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    return System.currentTimeMillis() - lastCleanupEndTime >= 
interval.toMillis();
+  }
+
+  @Override
+  protected CleanupOperation getCleanupOperation() {
+    return CleanupOperation.ORPHAN_FILES_CLEANING;
+  }
+
   @Override
   protected boolean enabled(TableRuntime tableRuntime) {
     return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index f7d0cb927..15f2d49d9 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
 import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,16 @@ public class SnapshotsExpiringExecutor extends 
PeriodicTableScheduler {
     scheduleIfNecessary(tableRuntime, getStartDelay());
   }
 
+  @Override
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
+  }
+
+  @Override
+  protected CleanupOperation getCleanupOperation() {
+    return CleanupOperation.SNAPSHOTS_EXPIRING;
+  }
+
   @Override
   protected long getExecutorDelay() {
     return ThreadLocalRandom.current().nextLong(INTERVAL);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index f2845cc71..50bd12585 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -42,6 +42,8 @@ import 
org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.table.blocker.TableBlocker;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -81,11 +83,17 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime
           .jsonType(AbstractOptimizingEvaluator.PendingInput.class)
           .defaultValue(new AbstractOptimizingEvaluator.PendingInput());
 
+  private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
+      StateKey.stateKey("cleanup_state")
+          .jsonType(TableRuntimeCleanupState.class)
+          .defaultValue(new TableRuntimeCleanupState());
+
   private static final StateKey<Long> PROCESS_ID_KEY =
       StateKey.stateKey("process_id").longType().defaultValue(0L);
 
   public static final List<StateKey<?>> REQUIRED_STATES =
-      Lists.newArrayList(OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, 
PROCESS_ID_KEY);
+      Lists.newArrayList(
+          OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, 
CLEANUP_STATE_KEY);
 
   private final Map<Action, TableProcessContainer> processContainerMap = 
Maps.newConcurrentMap();
   private final TableOptimizingMetrics optimizingMetrics;
@@ -353,6 +361,47 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime
         .commit();
   }
 
+  public long getLastCleanTime(CleanupOperation operation) {
+    TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
+    switch (operation) {
+      case ORPHAN_FILES_CLEANING:
+        return state.getLastOrphanFilesCleanTime();
+      case DANGLING_DELETE_FILES_CLEANING:
+        return state.getLastDanglingDeleteFilesCleanTime();
+      case DATA_EXPIRING:
+        return state.getLastDataExpiringTime();
+      case SNAPSHOTS_EXPIRING:
+        return state.getLastSnapshotsExpiringTime();
+      default:
+        return 0L;
+    }
+  }
+
+  public void updateLastCleanTime(CleanupOperation operation, long time) {
+    store()
+        .begin()
+        .updateState(
+            CLEANUP_STATE_KEY,
+            state -> {
+              switch (operation) {
+                case ORPHAN_FILES_CLEANING:
+                  state.setLastOrphanFilesCleanTime(time);
+                  break;
+                case DANGLING_DELETE_FILES_CLEANING:
+                  state.setLastDanglingDeleteFilesCleanTime(time);
+                  break;
+                case DATA_EXPIRING:
+                  state.setLastDataExpiringTime(time);
+                  break;
+                case SNAPSHOTS_EXPIRING:
+                  state.setLastSnapshotsExpiringTime(time);
+                  break;
+              }
+              return state;
+            })
+        .commit();
+  }
+
   public void completeProcess(boolean success) {
     OptimizingStatus originalStatus = getOptimizingStatus();
     OptimizingType processType = optimizingProcess.getOptimizingType();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
new file mode 100644
index 000000000..10afefe63
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table.cleanup;
+
+/** Table cleanup operation enum. Defines different operation types for table 
cleanup tasks. */
+public enum CleanupOperation {
+  DANGLING_DELETE_FILES_CLEANING,
+  ORPHAN_FILES_CLEANING,
+  DATA_EXPIRING,
+  SNAPSHOTS_EXPIRING,
+  //  NONE indicates operation types where no cleanup process records are
+  //  saved in the table_runtime_state table.
+  NONE;
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
new file mode 100644
index 000000000..9dfb98f6e
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table.cleanup;
+
+public class TableRuntimeCleanupState {
+  private long lastOrphanFilesCleanTime;
+  private long lastDanglingDeleteFilesCleanTime;
+  private long lastDataExpiringTime;
+  private long lastSnapshotsExpiringTime;
+
+  public long getLastOrphanFilesCleanTime() {
+    return lastOrphanFilesCleanTime;
+  }
+
+  public void setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) {
+    this.lastOrphanFilesCleanTime = lastOrphanFilesCleanTime;
+  }
+
+  public long getLastDanglingDeleteFilesCleanTime() {
+    return lastDanglingDeleteFilesCleanTime;
+  }
+
+  public void setLastDanglingDeleteFilesCleanTime(long 
lastDanglingDeleteFilesCleanTime) {
+    this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime;
+  }
+
+  public long getLastDataExpiringTime() {
+    return lastDataExpiringTime;
+  }
+
+  public void setLastDataExpiringTime(long lastDataExpiringTime) {
+    this.lastDataExpiringTime = lastDataExpiringTime;
+  }
+
+  public long getLastSnapshotsExpiringTime() {
+    return lastSnapshotsExpiringTime;
+  }
+
+  public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
+    this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
+  }
+}
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index 3f4c9031e..b820a762d 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -151,8 +151,6 @@ CREATE TABLE `table_process`
     KEY  `table_index` (`table_id`, `create_time`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after 
each commit';
 
-
-
 CREATE TABLE `optimizing_process_state`
 (
     `process_id`                    bigint(20) NOT NULL COMMENT 
'optimizing_procedure UUID',
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index bc4375cd8..5f4073ec7 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -212,7 +212,6 @@ comment on column table_runtime_state.state_version is 
'Table runtime state vers
 comment on column table_runtime_state.create_time is 'create time';
 comment on column table_runtime_state.update_time is 'update time';
 
-
 CREATE TABLE table_process (
     process_id      bigserial PRIMARY KEY,
     table_id        bigint NOT NULL,
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
new file mode 100644
index 000000000..50c0e9d19
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
+import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+
+/**
+ * Test table executor implementation for testing PeriodicTableScheduler 
functionality. This class
+ * allows configuration of cleanup operations and enabled state for testing 
purposes.
+ */
+class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler {
+  private final CleanupOperation cleanupOperation;
+  private final boolean enabled;
+  private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 
1 hour
+  private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 
1000L; // 1 day
+  private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 
* 60 * 1000L;
+  private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
+
+  public PeriodicTableSchedulerTestBase(
+      TableService tableService, CleanupOperation cleanupOperation, boolean 
enabled) {
+    super(tableService, 1);
+    this.cleanupOperation = cleanupOperation;
+    this.enabled = enabled;
+  }
+
+  @Override
+  protected CleanupOperation getCleanupOperation() {
+    return cleanupOperation;
+  }
+
+  @Override
+  protected long getNextExecutingTime(TableRuntime tableRuntime) {
+    return 1000;
+  }
+
+  @Override
+  protected boolean enabled(TableRuntime tableRuntime) {
+    return enabled;
+  }
+
+  @Override
+  protected void execute(TableRuntime tableRuntime) {
+    // Do nothing in test
+  }
+
+  @Override
+  protected long getExecutorDelay() {
+    return 0;
+  }
+
+  @Override
+  protected boolean shouldExecute(Long lastCleanupEndTime) {
+    long currentTime = System.currentTimeMillis();
+    switch (cleanupOperation) {
+      case SNAPSHOTS_EXPIRING:
+        return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
+      case ORPHAN_FILES_CLEANING:
+        return currentTime - lastCleanupEndTime >= 
ORPHAN_FILES_CLEANING_INTERVAL;
+      case DANGLING_DELETE_FILES_CLEANING:
+        return currentTime - lastCleanupEndTime >= 
DANGLING_DELETE_FILES_CLEANING_INTERVAL;
+      case DATA_EXPIRING:
+        return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL;
+      default:
+        return true;
+    }
+  }
+
+  public boolean shouldExecuteTaskForTest(
+      TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+    return shouldExecuteTask(tableRuntime, cleanupOperation);
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
new file mode 100644
index 000000000..c401c88d8
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.DefaultTableRuntimeStore;
+import org.apache.amoro.server.table.TableRuntimeHandler;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+import org.apache.amoro.table.TableRuntimeStore;
+import org.apache.amoro.table.TableSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class tests all aspects of cleanup operation handling in {@link
+ * org.apache.amoro.server.scheduler.PeriodicTableScheduler}.
+ */
+public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
+
+  private static final String TEST_CATALOG = "test_catalog";
+  private static final String TEST_DB = "test_db";
+  private static final String TEST_TABLE = "test_table";
+
+  static {
+    try {
+      Class.forName("org.apache.amoro.server.table.DerbyPersistence");
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize Derby persistence", e);
+    }
+  }
+
+  private static final TableRuntimeHandler TEST_HANDLER =
+      new TableRuntimeHandler() {
+        @Override
+        public void handleTableChanged(
+            TableRuntime tableRuntime,
+            org.apache.amoro.server.optimizing.OptimizingStatus 
originalStatus) {}
+
+        @Override
+        public void handleTableChanged(
+            TableRuntime tableRuntime, TableConfiguration originalConfig) {}
+      };
+
+  /**
+   * Create a test server table identifier with the given ID
+   *
+   * @param tableId the table ID
+   * @return a ServerTableIdentifier instance
+   */
+  private ServerTableIdentifier createTableIdentifier(long tableId) {
+    return ServerTableIdentifier.of(
+        tableId, TEST_CATALOG, TEST_DB, TEST_TABLE + "_" + tableId, 
TableFormat.ICEBERG);
+  }
+
+  /**
+   * Create a test DefaultTableRuntime with the given identifier
+   *
+   * @param identifier the table identifier
+   * @return a DefaultTableRuntime instance
+   */
+  private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier 
identifier) {
+    // Create table runtime meta
+    TableRuntimeMeta meta = new TableRuntimeMeta();
+    meta.setTableId(identifier.getId());
+    meta.setGroupName("test_group");
+    meta.setStatusCode(0);
+    meta.setTableConfig(Collections.emptyMap());
+    meta.setTableSummary(new TableSummary());
+
+    // Create table runtime store
+    TableRuntimeStore store =
+        new DefaultTableRuntimeStore(
+            identifier, meta, DefaultTableRuntime.REQUIRED_STATES, 
Collections.emptyList());
+
+    return new DefaultTableRuntime(store);
+  }
+
+  private void cleanUpTableRuntimeData(List<Long> tableIds) {
+    doAs(
+        TableRuntimeMapper.class,
+        mapper -> {
+          for (Long tableId : tableIds) {
+            try {
+              mapper.deleteRuntime(tableId);
+              mapper.removeAllTableStates(tableId);
+            } catch (Exception e) {
+              // Ignore if tables don't exist
+            }
+          }
+        });
+    doAs(
+        TableMetaMapper.class,
+        mapper -> {
+          for (Long tableId : tableIds) {
+            try {
+              mapper.deleteTableIdById(tableId);
+            } catch (Exception e) {
+              // Ignore if tables don't exist
+            }
+          }
+        });
+  }
+
+  /**
+   * Prepare test environment by cleaning up test data and table runtime data
+   *
+   * @param testTableIds list of table IDs to clean up
+   */
+  private void prepareTestEnvironment(List<Long> testTableIds) {
+    cleanUpTableRuntimeData(testTableIds);
+  }
+
+  /**
+   * Create a test table executor
+   *
+   * @param cleanupOperation the cleanup operation to use
+   * @param enabled whether the executor should be enabled
+   * @return a new PeriodicTableSchedulerTestBase instance
+   */
+  private PeriodicTableSchedulerTestBase createTestExecutor(
+      CleanupOperation cleanupOperation, boolean enabled) {
+    return new PeriodicTableSchedulerTestBase(null, cleanupOperation, enabled);
+  }
+
+  /**
+   * Create a test table executor with default enabled state (true)
+   *
+   * @param cleanupOperation the cleanup operation to use
+   * @return a new PeriodicTableSchedulerTestBase instance
+   */
+  private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation 
cleanupOperation) {
+    return createTestExecutor(cleanupOperation, true);
+  }
+
+  /**
+   * Test whether the executor should execute a task for a given table runtime 
and cleanup operation
+   */
+  @Test
+  public void testShouldExecuteTaskWithNoPreviousCleanup() {
+    List<CleanupOperation> operations =
+        Arrays.asList(
+            CleanupOperation.ORPHAN_FILES_CLEANING,
+            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+            CleanupOperation.DATA_EXPIRING,
+            CleanupOperation.SNAPSHOTS_EXPIRING);
+
+    for (CleanupOperation operation : operations) {
+      List<Long> testTableIds = Collections.singletonList(1L);
+      prepareTestEnvironment(testTableIds);
+
+      PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+      ServerTableIdentifier identifier = createTableIdentifier(1L);
+      DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+      boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, 
operation);
+      Assert.assertTrue(
+          "Should execute when there's no previous cleanup time for operation 
" + operation,
+          shouldExecute);
+    }
+  }
+
+  /** Test should not execute task with recent cleanup */
+  @Test
+  public void testShouldNotExecuteTaskWithRecentCleanup() {
+    List<CleanupOperation> operations =
+        Arrays.asList(
+            CleanupOperation.ORPHAN_FILES_CLEANING,
+            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+            CleanupOperation.DATA_EXPIRING,
+            CleanupOperation.SNAPSHOTS_EXPIRING);
+
+    for (CleanupOperation operation : operations) {
+      List<Long> testTableIds = Collections.singletonList(1L);
+      cleanUpTableRuntimeData(testTableIds);
+
+      PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+
+      // Create DefaultTableRuntime and set recent cleanup time
+      ServerTableIdentifier identifier = createTableIdentifier(1L);
+      DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+      // Simulate recent cleanup
+      long recentTime = System.currentTimeMillis() - 10000L;
+      tableRuntime.updateLastCleanTime(operation, recentTime);
+
+      boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, 
operation);
+      Assert.assertFalse(
+          "Should not execute when recently cleaned up for operation " + 
operation, shouldExecute);
+    }
+  }
+
+  /** Test should execute task with old cleanup */
+  @Test
+  public void testShouldExecuteTaskWithOldCleanup() {
+    List<CleanupOperation> operations =
+        Arrays.asList(
+            CleanupOperation.ORPHAN_FILES_CLEANING,
+            CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+            CleanupOperation.DATA_EXPIRING,
+            CleanupOperation.SNAPSHOTS_EXPIRING);
+
+    for (CleanupOperation operation : operations) {
+      List<Long> testTableIds = Collections.singletonList(1L);
+      cleanUpTableRuntimeData(testTableIds);
+
+      PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+
+      // Create DefaultTableRuntime and set old cleanup time
+      ServerTableIdentifier identifier = createTableIdentifier(1L);
+      DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+      // Simulate old cleanup time (30 hours ago)
+      long oldTime = System.currentTimeMillis() - 30 * 60 * 60 * 1000L;
+      tableRuntime.updateLastCleanTime(operation, oldTime);
+
+      boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, 
operation);
+      Assert.assertTrue(
+          "Should execute when enough time has passed since last cleanup for 
operation "
+              + operation,
+          shouldExecute);
+    }
+  }
+
+  @Test
+  public void testShouldExecuteTaskWithNoneOperation() {
+    List<Long> testTableIds = Collections.singletonList(1L);
+    prepareTestEnvironment(testTableIds);
+
+    PeriodicTableSchedulerTestBase executor = 
createTestExecutor(CleanupOperation.NONE);
+    ServerTableIdentifier identifier = createTableIdentifier(1L);
+    DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+    // Should always execute with NONE operation
+    boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, 
CleanupOperation.NONE);
+    Assert.assertTrue("Should always execute with NONE operation", 
shouldExecute);
+  }
+}


Reply via email to