This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 24705a529 Save the last completion time for each cleanup operation
performed on each optimization table. (#3802)
24705a529 is described below
commit 24705a5290ba32f0cf3e1f87761b0f222b77fb20
Author: zhangwl9 <[email protected]>
AuthorDate: Thu Nov 20 20:29:54 2025 +0800
Save the last completion time for each cleanup operation performed on each
optimization table. (#3802)
* Save the last completion time for each cleanup operation performed on
each optimization table.
# Conflicts:
# amoro-ams/src/main/resources/mysql/upgrade.sql
* Store the execution time of each cleanup operation for the optimization
table in the table_runtime_state table.
* fixup style
---------
Co-authored-by: 张文领 <[email protected]>
---
.../server/scheduler/PeriodicTableScheduler.java | 130 +++++++++-
.../DanglingDeleteFilesCleaningExecutor.java | 11 +
.../scheduler/inline/DataExpiringExecutor.java | 11 +
.../inline/OrphanFilesCleaningExecutor.java | 11 +
.../inline/SnapshotsExpiringExecutor.java | 11 +
.../amoro/server/table/DefaultTableRuntime.java | 51 +++-
.../server/table/cleanup/CleanupOperation.java | 30 +++
.../table/cleanup/TableRuntimeCleanupState.java | 58 +++++
.../src/main/resources/mysql/ams-mysql-init.sql | 2 -
.../main/resources/postgres/ams-postgres-init.sql | 1 -
.../inline/PeriodicTableSchedulerTestBase.java | 91 +++++++
.../inline/TestPeriodicTableSchedulerCleanup.java | 265 +++++++++++++++++++++
12 files changed, 666 insertions(+), 6 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index 5c8554273..ef7419325 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -25,8 +25,10 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -84,18 +86,49 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
.forEach(
tableRuntime -> {
if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
- executor.schedule(
- () -> executeTask(tableRuntime), getStartDelay(),
TimeUnit.MILLISECONDS);
+ scheduleTableExecution(
+ tableRuntime, calculateExecutionDelay(tableRuntime,
getCleanupOperation()));
}
});
logger.info("Table executor {} initialized", getClass().getSimpleName());
}
+ private long calculateExecutionDelay(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+ // If the table needs to be executed immediately, schedule it to run after
a short delay.
+ if (shouldExecuteTask(tableRuntime, cleanupOperation)) {
+ return getStartDelay();
+ }
+
+ // If the table does not need to be executed immediately, schedule it for
the next execution
+ // time.
+ // Adding getStartDelay() helps distribute the execution time of multiple
tables,
+ // reducing the probability of simultaneous execution and system load
spikes.
+ return getNextExecutingTime(tableRuntime) + getStartDelay();
+ }
+
+ /**
+ * Schedule a table for execution with the specified delay.
+ *
+ * @param tableRuntime The table runtime to schedule
+ * @param delay The delay in milliseconds before execution
+ */
+ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
+ executor.schedule(() -> executeTask(tableRuntime), delay,
TimeUnit.MILLISECONDS);
+ logger.debug(
+ "Scheduled execution for table {} with delay {} ms",
+ tableRuntime.getTableIdentifier(),
+ delay);
+ }
+
private void executeTask(TableRuntime tableRuntime) {
try {
if (isExecutable(tableRuntime)) {
execute(tableRuntime);
+ // Different tables take different amounts of time to execute the end
of execute(),
+ // so you need to perform the update operation separately for each
table.
+ persistUpdatingCleanupTime(tableRuntime);
}
} finally {
scheduledTables.remove(tableRuntime.getTableIdentifier());
@@ -117,6 +150,99 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
protected abstract void execute(TableRuntime tableRuntime);
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ return true;
+ }
+
+ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
+ CleanupOperation cleanupOperation = getCleanupOperation();
+ if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+ return;
+ }
+
+ try {
+ long currentTime = System.currentTimeMillis();
+ ((DefaultTableRuntime)
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
+
+ logger.debug(
+ "Update lastCleanTime for table {} with cleanup operation {}",
+ tableRuntime.getTableIdentifier().getTableName(),
+ cleanupOperation);
+ } catch (Exception e) {
+ logger.error(
+ "Failed to update lastCleanTime for table {}",
+ tableRuntime.getTableIdentifier().getTableName(),
+ e);
+ }
+ }
+
+ /**
+ * Get cleanup operation. Default is NONE, subclasses should override this
method to provide
+ * specific operation.
+ *
+ * @return cleanup operation
+ */
+ protected CleanupOperation getCleanupOperation() {
+ return CleanupOperation.NONE;
+ }
+
+ protected boolean shouldExecuteTask(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+ if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
+ return true;
+ }
+
+ long lastCleanupEndTime =
+ ((DefaultTableRuntime)
tableRuntime).getLastCleanTime(cleanupOperation);
+
+ // If it's zero, execute the task
+ if (lastCleanupEndTime == 0L) {
+ logger.debug(
+ "LastCleanupTime for table {} with operation {} is not exist,
executing task",
+ tableRuntime.getTableIdentifier().getTableName(),
+ cleanupOperation);
+ return true;
+ }
+
+ // After ams restarts, certain cleanup operations can only be re-executed
+ // if sufficient time has elapsed since the last cleanup.
+ boolean result = shouldExecute(lastCleanupEndTime);
+ logger.debug(
+ result
+ ? "Should execute task for table {} with {}"
+ : "Not enough time has passed since last cleanup for table {} with
{}, delaying execution",
+ tableRuntime.getTableIdentifier().getTableName(),
+ cleanupOperation);
+
+ return result;
+ }
+
+ /**
+ * Check if the operation should be skipped based on common conditions.
+ *
+ * @param tableRuntime the table runtime to check
+ * @param cleanupOperation the cleanup operation to perform
+ * @return true if the operation should be skipped, false otherwise
+ */
+ private boolean shouldSkipOperation(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+ if (cleanupOperation == CleanupOperation.NONE) {
+ logger.debug(
+ "No cleanup operation specified, skipping cleanup time check for
table {}",
+ tableRuntime.getTableIdentifier().getTableName());
+ return true;
+ }
+
+ if (!(tableRuntime instanceof DefaultTableRuntime)) {
+ logger.debug(
+ "Table runtime is not DefaultTableRuntime, skipping cleanup time
check for table {}",
+ tableRuntime.getTableIdentifier().getTableName());
+ return true;
+ }
+
+ return false;
+ }
+
protected String getThreadName() {
return String.join("-",
StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
.toLowerCase(Locale.ROOT);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index 16f80c9c0..d7d8801f1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -26,6 +26,7 @@ import
org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,16 @@ public class DanglingDeleteFilesCleaningExecutor extends
PeriodicTableScheduler
return INTERVAL;
}
+ @Override
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
+ }
+
+ @Override
+ protected CleanupOperation getCleanupOperation() {
+ return CleanupOperation.DANGLING_DELETE_FILES_CLEANING;
+ }
+
@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime instanceof DefaultTableRuntime
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 4990b7409..61f45860b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,16 @@ public class DataExpiringExecutor extends
PeriodicTableScheduler {
return interval.toMillis();
}
+ @Override
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
+ }
+
+ @Override
+ protected CleanupOperation getCleanupOperation() {
+ return CleanupOperation.DATA_EXPIRING;
+ }
+
@Override
protected boolean enabled(TableRuntime tableRuntime) {
return
tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index 332c51141..21d60cd10 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,16 @@ public class OrphanFilesCleaningExecutor extends
PeriodicTableScheduler {
return interval.toMillis();
}
+ @Override
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
+ }
+
+ @Override
+ protected CleanupOperation getCleanupOperation() {
+ return CleanupOperation.ORPHAN_FILES_CLEANING;
+ }
+
@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index f7d0cb927..15f2d49d9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,16 @@ public class SnapshotsExpiringExecutor extends
PeriodicTableScheduler {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
+ @Override
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
+ }
+
+ @Override
+ protected CleanupOperation getCleanupOperation() {
+ return CleanupOperation.SNAPSHOTS_EXPIRING;
+ }
+
@Override
protected long getExecutorDelay() {
return ThreadLocalRandom.current().nextLong(INTERVAL);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index f2845cc71..50bd12585 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -42,6 +42,8 @@ import
org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.table.blocker.TableBlocker;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -81,11 +83,17 @@ public class DefaultTableRuntime extends
AbstractTableRuntime
.jsonType(AbstractOptimizingEvaluator.PendingInput.class)
.defaultValue(new AbstractOptimizingEvaluator.PendingInput());
+ private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
+ StateKey.stateKey("cleanup_state")
+ .jsonType(TableRuntimeCleanupState.class)
+ .defaultValue(new TableRuntimeCleanupState());
+
private static final StateKey<Long> PROCESS_ID_KEY =
StateKey.stateKey("process_id").longType().defaultValue(0L);
public static final List<StateKey<?>> REQUIRED_STATES =
- Lists.newArrayList(OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY,
PROCESS_ID_KEY);
+ Lists.newArrayList(
+ OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY,
CLEANUP_STATE_KEY);
private final Map<Action, TableProcessContainer> processContainerMap =
Maps.newConcurrentMap();
private final TableOptimizingMetrics optimizingMetrics;
@@ -353,6 +361,47 @@ public class DefaultTableRuntime extends
AbstractTableRuntime
.commit();
}
+ public long getLastCleanTime(CleanupOperation operation) {
+ TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
+ switch (operation) {
+ case ORPHAN_FILES_CLEANING:
+ return state.getLastOrphanFilesCleanTime();
+ case DANGLING_DELETE_FILES_CLEANING:
+ return state.getLastDanglingDeleteFilesCleanTime();
+ case DATA_EXPIRING:
+ return state.getLastDataExpiringTime();
+ case SNAPSHOTS_EXPIRING:
+ return state.getLastSnapshotsExpiringTime();
+ default:
+ return 0L;
+ }
+ }
+
+ public void updateLastCleanTime(CleanupOperation operation, long time) {
+ store()
+ .begin()
+ .updateState(
+ CLEANUP_STATE_KEY,
+ state -> {
+ switch (operation) {
+ case ORPHAN_FILES_CLEANING:
+ state.setLastOrphanFilesCleanTime(time);
+ break;
+ case DANGLING_DELETE_FILES_CLEANING:
+ state.setLastDanglingDeleteFilesCleanTime(time);
+ break;
+ case DATA_EXPIRING:
+ state.setLastDataExpiringTime(time);
+ break;
+ case SNAPSHOTS_EXPIRING:
+ state.setLastSnapshotsExpiringTime(time);
+ break;
+ }
+ return state;
+ })
+ .commit();
+ }
+
public void completeProcess(boolean success) {
OptimizingStatus originalStatus = getOptimizingStatus();
OptimizingType processType = optimizingProcess.getOptimizingType();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
new file mode 100644
index 000000000..10afefe63
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table.cleanup;
+
+/** Table cleanup operation enum. Defines different operation types for table
cleanup tasks. */
+public enum CleanupOperation {
+ DANGLING_DELETE_FILES_CLEANING,
+ ORPHAN_FILES_CLEANING,
+ DATA_EXPIRING,
+ SNAPSHOTS_EXPIRING,
+ // NONE indicates operation types where no cleanup process records are
+ // saved in the table_runtime_state table.
+ NONE;
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
new file mode 100644
index 000000000..9dfb98f6e
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table.cleanup;
+
+public class TableRuntimeCleanupState {
+ private long lastOrphanFilesCleanTime;
+ private long lastDanglingDeleteFilesCleanTime;
+ private long lastDataExpiringTime;
+ private long lastSnapshotsExpiringTime;
+
+ public long getLastOrphanFilesCleanTime() {
+ return lastOrphanFilesCleanTime;
+ }
+
+ public void setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) {
+ this.lastOrphanFilesCleanTime = lastOrphanFilesCleanTime;
+ }
+
+ public long getLastDanglingDeleteFilesCleanTime() {
+ return lastDanglingDeleteFilesCleanTime;
+ }
+
+ public void setLastDanglingDeleteFilesCleanTime(long
lastDanglingDeleteFilesCleanTime) {
+ this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime;
+ }
+
+ public long getLastDataExpiringTime() {
+ return lastDataExpiringTime;
+ }
+
+ public void setLastDataExpiringTime(long lastDataExpiringTime) {
+ this.lastDataExpiringTime = lastDataExpiringTime;
+ }
+
+ public long getLastSnapshotsExpiringTime() {
+ return lastSnapshotsExpiringTime;
+ }
+
+ public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
+ this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
+ }
+}
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index 3f4c9031e..b820a762d 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -151,8 +151,6 @@ CREATE TABLE `table_process`
KEY `table_index` (`table_id`, `create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after
each commit';
-
-
CREATE TABLE `optimizing_process_state`
(
`process_id` bigint(20) NOT NULL COMMENT
'optimizing_procedure UUID',
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index bc4375cd8..5f4073ec7 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -212,7 +212,6 @@ comment on column table_runtime_state.state_version is
'Table runtime state vers
comment on column table_runtime_state.create_time is 'create time';
comment on column table_runtime_state.update_time is 'update time';
-
CREATE TABLE table_process (
process_id bigserial PRIMARY KEY,
table_id bigint NOT NULL,
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
new file mode 100644
index 000000000..50c0e9d19
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
+import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+
+/**
+ * Test table executor implementation for testing PeriodicTableScheduler
functionality. This class
+ * allows configuration of cleanup operations and enabled state for testing
purposes.
+ */
+class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler {
+ private final CleanupOperation cleanupOperation;
+ private final boolean enabled;
+ private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; //
1 hour
+ private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 *
1000L; // 1 day
+ private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60
* 60 * 1000L;
+ private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
+
+ public PeriodicTableSchedulerTestBase(
+ TableService tableService, CleanupOperation cleanupOperation, boolean
enabled) {
+ super(tableService, 1);
+ this.cleanupOperation = cleanupOperation;
+ this.enabled = enabled;
+ }
+
+ @Override
+ protected CleanupOperation getCleanupOperation() {
+ return cleanupOperation;
+ }
+
+ @Override
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
+ return 1000;
+ }
+
+ @Override
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return enabled;
+ }
+
+ @Override
+ protected void execute(TableRuntime tableRuntime) {
+ // Do nothing in test
+ }
+
+ @Override
+ protected long getExecutorDelay() {
+ return 0;
+ }
+
+ @Override
+ protected boolean shouldExecute(Long lastCleanupEndTime) {
+ long currentTime = System.currentTimeMillis();
+ switch (cleanupOperation) {
+ case SNAPSHOTS_EXPIRING:
+ return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
+ case ORPHAN_FILES_CLEANING:
+ return currentTime - lastCleanupEndTime >=
ORPHAN_FILES_CLEANING_INTERVAL;
+ case DANGLING_DELETE_FILES_CLEANING:
+ return currentTime - lastCleanupEndTime >=
DANGLING_DELETE_FILES_CLEANING_INTERVAL;
+ case DATA_EXPIRING:
+ return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL;
+ default:
+ return true;
+ }
+ }
+
+ public boolean shouldExecuteTaskForTest(
+ TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
+ return shouldExecuteTask(tableRuntime, cleanupOperation);
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
new file mode 100644
index 000000000..c401c88d8
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.DefaultTableRuntimeStore;
+import org.apache.amoro.server.table.TableRuntimeHandler;
+import org.apache.amoro.server.table.cleanup.CleanupOperation;
+import org.apache.amoro.table.TableRuntimeStore;
+import org.apache.amoro.table.TableSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class tests all aspects of cleanup operation handling in {@link
+ * org.apache.amoro.server.scheduler.PeriodicTableScheduler}.
+ */
+public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
+
+ private static final String TEST_CATALOG = "test_catalog";
+ private static final String TEST_DB = "test_db";
+ private static final String TEST_TABLE = "test_table";
+
+ static {
+ try {
+ Class.forName("org.apache.amoro.server.table.DerbyPersistence");
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize Derby persistence", e);
+ }
+ }
+
+ private static final TableRuntimeHandler TEST_HANDLER =
+ new TableRuntimeHandler() {
+ @Override
+ public void handleTableChanged(
+ TableRuntime tableRuntime,
+ org.apache.amoro.server.optimizing.OptimizingStatus
originalStatus) {}
+
+ @Override
+ public void handleTableChanged(
+ TableRuntime tableRuntime, TableConfiguration originalConfig) {}
+ };
+
+ /**
+ * Create a test server table identifier with the given ID
+ *
+ * @param tableId the table ID
+ * @return a ServerTableIdentifier instance
+ */
+ private ServerTableIdentifier createTableIdentifier(long tableId) {
+ return ServerTableIdentifier.of(
+ tableId, TEST_CATALOG, TEST_DB, TEST_TABLE + "_" + tableId,
TableFormat.ICEBERG);
+ }
+
+ /**
+ * Create a test DefaultTableRuntime with the given identifier
+ *
+ * @param identifier the table identifier
+ * @return a DefaultTableRuntime instance
+ */
+ private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier
identifier) {
+ // Create table runtime meta
+ TableRuntimeMeta meta = new TableRuntimeMeta();
+ meta.setTableId(identifier.getId());
+ meta.setGroupName("test_group");
+ meta.setStatusCode(0);
+ meta.setTableConfig(Collections.emptyMap());
+ meta.setTableSummary(new TableSummary());
+
+ // Create table runtime store
+ TableRuntimeStore store =
+ new DefaultTableRuntimeStore(
+ identifier, meta, DefaultTableRuntime.REQUIRED_STATES,
Collections.emptyList());
+
+ return new DefaultTableRuntime(store);
+ }
+
+ private void cleanUpTableRuntimeData(List<Long> tableIds) {
+ doAs(
+ TableRuntimeMapper.class,
+ mapper -> {
+ for (Long tableId : tableIds) {
+ try {
+ mapper.deleteRuntime(tableId);
+ mapper.removeAllTableStates(tableId);
+ } catch (Exception e) {
+ // Ignore if tables don't exist
+ }
+ }
+ });
+ doAs(
+ TableMetaMapper.class,
+ mapper -> {
+ for (Long tableId : tableIds) {
+ try {
+ mapper.deleteTableIdById(tableId);
+ } catch (Exception e) {
+ // Ignore if tables don't exist
+ }
+ }
+ });
+ }
+
+ /**
+ * Prepare test environment by cleaning up test data and table runtime data
+ *
+ * @param testTableIds list of table IDs to clean up
+ */
+ private void prepareTestEnvironment(List<Long> testTableIds) {
+ cleanUpTableRuntimeData(testTableIds);
+ }
+
+ /**
+ * Create a test table executor
+ *
+ * @param cleanupOperation the cleanup operation to use
+ * @param enabled whether the executor should be enabled
+ * @return a new PeriodicTableSchedulerTestBase instance
+ */
+ private PeriodicTableSchedulerTestBase createTestExecutor(
+ CleanupOperation cleanupOperation, boolean enabled) {
+ return new PeriodicTableSchedulerTestBase(null, cleanupOperation, enabled);
+ }
+
+ /**
+ * Create a test table executor with default enabled state (true)
+ *
+ * @param cleanupOperation the cleanup operation to use
+ * @return a new PeriodicTableSchedulerTestBase instance
+ */
+ private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation
cleanupOperation) {
+ return createTestExecutor(cleanupOperation, true);
+ }
+
+ /**
+ * Test whether the executor should execute a task for a given table runtime
and cleanup operation
+ */
+ @Test
+ public void testShouldExecuteTaskWithNoPreviousCleanup() {
+ List<CleanupOperation> operations =
+ Arrays.asList(
+ CleanupOperation.ORPHAN_FILES_CLEANING,
+ CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+ CleanupOperation.DATA_EXPIRING,
+ CleanupOperation.SNAPSHOTS_EXPIRING);
+
+ for (CleanupOperation operation : operations) {
+ List<Long> testTableIds = Collections.singletonList(1L);
+ prepareTestEnvironment(testTableIds);
+
+ PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+ ServerTableIdentifier identifier = createTableIdentifier(1L);
+ DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+ boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
+ Assert.assertTrue(
+ "Should execute when there's no previous cleanup time for operation
" + operation,
+ shouldExecute);
+ }
+ }
+
+ /** Test should not execute task with recent cleanup */
+ @Test
+ public void testShouldNotExecuteTaskWithRecentCleanup() {
+ List<CleanupOperation> operations =
+ Arrays.asList(
+ CleanupOperation.ORPHAN_FILES_CLEANING,
+ CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+ CleanupOperation.DATA_EXPIRING,
+ CleanupOperation.SNAPSHOTS_EXPIRING);
+
+ for (CleanupOperation operation : operations) {
+ List<Long> testTableIds = Collections.singletonList(1L);
+ cleanUpTableRuntimeData(testTableIds);
+
+ PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+
+ // Create DefaultTableRuntime and set recent cleanup time
+ ServerTableIdentifier identifier = createTableIdentifier(1L);
+ DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+ // Simulate recent cleanup
+ long recentTime = System.currentTimeMillis() - 10000L;
+ tableRuntime.updateLastCleanTime(operation, recentTime);
+
+ boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
+ Assert.assertFalse(
+ "Should not execute when recently cleaned up for operation " +
operation, shouldExecute);
+ }
+ }
+
+ /** Test should execute task with old cleanup */
+ @Test
+ public void testShouldExecuteTaskWithOldCleanup() {
+ List<CleanupOperation> operations =
+ Arrays.asList(
+ CleanupOperation.ORPHAN_FILES_CLEANING,
+ CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
+ CleanupOperation.DATA_EXPIRING,
+ CleanupOperation.SNAPSHOTS_EXPIRING);
+
+ for (CleanupOperation operation : operations) {
+ List<Long> testTableIds = Collections.singletonList(1L);
+ cleanUpTableRuntimeData(testTableIds);
+
+ PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
+
+ // Create DefaultTableRuntime and set old cleanup time
+ ServerTableIdentifier identifier = createTableIdentifier(1L);
+ DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+ // Simulate old cleanup time (30 hours ago)
+ long oldTime = System.currentTimeMillis() - 30 * 60 * 60 * 1000L;
+ tableRuntime.updateLastCleanTime(operation, oldTime);
+
+ boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
+ Assert.assertTrue(
+ "Should execute when enough time has passed since last cleanup for
operation "
+ + operation,
+ shouldExecute);
+ }
+ }
+
+ @Test
+ public void testShouldExecuteTaskWithNoneOperation() {
+ List<Long> testTableIds = Collections.singletonList(1L);
+ prepareTestEnvironment(testTableIds);
+
+ PeriodicTableSchedulerTestBase executor =
createTestExecutor(CleanupOperation.NONE);
+ ServerTableIdentifier identifier = createTableIdentifier(1L);
+ DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
+
+ // Should always execute with NONE operation
+ boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
CleanupOperation.NONE);
+ Assert.assertTrue("Should always execute with NONE operation",
shouldExecute);
+ }
+}