This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 23b0ab98e [AMORO-4217] Clean up invalid code and parameters left over
from the snapshot-expiring refactoring (#4226)
23b0ab98e is described below
commit 23b0ab98ef4a07b60a7e4637692fb582fe8e8ca8
Author: WenLingzhang <[email protected]>
AuthorDate: Wed May 20 14:57:19 2026 +0800
[AMORO-4217] Clean up invalid code and parameters left over from the
snapshot-expiring refactoring (#4226)
* Remove unused code
* Add assertions for containers in JacksonUtilTest
* Clean up JacksonUtilTest by removing blank line
Removed unnecessary blank line in JacksonUtilTest.
* Remove unnecessary newline in jacksonTest method
---------
Co-authored-by: 张文领 <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 18 --
.../amoro/server/AmoroManagementConfValidator.java | 4 -
.../server/scheduler/PeriodicTableScheduler.java | 121 +---------
.../inline/SnapshotsExpiringExecutor.java | 86 -------
.../amoro/server/table/DefaultTableRuntime.java | 27 ---
.../server/table/cleanup/CleanupOperation.java | 27 ---
.../server/TestAmoroManagementConfValidator.java | 8 -
.../inline/PeriodicTableSchedulerTestBase.java | 82 -------
.../inline/TestConfigurableIntervalExecutors.java | 34 +--
.../inline/TestPeriodicTableSchedulerCleanup.java | 250 ---------------------
.../org/apache/amoro/utils/JacksonUtilTest.java | 6 +-
charts/amoro/templates/amoro-configmap.yaml | 4 -
dist/src/main/amoro-bin/conf/config.yaml | 4 -
docs/configuration/ams-config.md | 3 -
14 files changed, 4 insertions(+), 670 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index fe72b87b6..292a9975f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -156,24 +156,6 @@ public class AmoroManagementConf {
.defaultValue(1000000)
.withDescription("The queue size of the executors of the external
catalog explorer.");
- public static final ConfigOption<Boolean> EXPIRE_SNAPSHOTS_ENABLED =
- ConfigOptions.key("expire-snapshots.enabled")
- .booleanType()
- .defaultValue(true)
- .withDescription("Enable snapshots expiring.");
-
- public static final ConfigOption<Integer> EXPIRE_SNAPSHOTS_THREAD_COUNT =
- ConfigOptions.key("expire-snapshots.thread-count")
- .intType()
- .defaultValue(10)
- .withDescription("The number of threads used for snapshots
expiring.");
-
- public static final ConfigOption<Duration> EXPIRE_SNAPSHOTS_INTERVAL =
- ConfigOptions.key("expire-snapshots.interval")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription("Interval for expiring snapshots.");
-
public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
index 627b577d6..7c767aa14 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
@@ -82,10 +82,6 @@ public class AmoroManagementConfValidator {
validateThreadCount(configurations,
AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(configurations,
AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);
- if
(configurations.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
- validateThreadCount(configurations,
AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT);
- }
-
if
(configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(configurations,
AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index 1f19187e9..8c1d0a095 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -25,10 +25,8 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.OptimizingStatus;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -86,34 +84,13 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
.forEach(
tableRuntime -> {
if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
- scheduleTableExecution(
- tableRuntime, calculateExecutionDelay(tableRuntime,
getCleanupOperation()));
+ scheduleTableExecution(tableRuntime, getStartDelay());
}
});
logger.info("Table executor {} initialized", getClass().getSimpleName());
}
- private long calculateExecutionDelay(
- TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
- // If the table needs to be executed immediately, schedule it to run after
a short delay.
- if (shouldExecuteTask(tableRuntime, cleanupOperation)) {
- return getStartDelay();
- }
-
- // If the table does not need to be executed immediately, schedule it for
the next execution
- // time.
- // Adding getStartDelay() helps distribute the execution time of multiple
tables,
- // reducing the probability of simultaneous execution and system load
spikes.
- return getNextExecutingTime(tableRuntime) + getStartDelay();
- }
-
- /**
- * Schedule a table for execution with the specified delay.
- *
- * @param tableRuntime The table runtime to schedule
- * @param delay The delay in milliseconds before execution
- */
private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
executor.schedule(() -> executeTask(tableRuntime), delay,
TimeUnit.MILLISECONDS);
logger.debug(
@@ -126,9 +103,6 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
try {
if (isExecutable(tableRuntime)) {
execute(tableRuntime);
- // Different tables take different amounts of time to execute the end
of execute(),
- // so you need to perform the update operation separately for each
table.
- persistUpdatingCleanupTime(tableRuntime);
}
} catch (Exception e) {
logger.error("exception when schedule for table: {}",
tableRuntime.getTableIdentifier(), e);
@@ -152,99 +126,6 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
protected abstract void execute(TableRuntime tableRuntime);
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- return true;
- }
-
- private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
- CleanupOperation cleanupOperation = getCleanupOperation();
- if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
- return;
- }
-
- try {
- long currentTime = System.currentTimeMillis();
- ((DefaultTableRuntime)
tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
-
- logger.debug(
- "Update lastCleanTime for table {} with cleanup operation {}",
- tableRuntime.getTableIdentifier().getTableName(),
- cleanupOperation);
- } catch (Exception e) {
- logger.error(
- "Failed to update lastCleanTime for table {}",
- tableRuntime.getTableIdentifier().getTableName(),
- e);
- }
- }
-
- /**
- * Get cleanup operation. Default is NONE, subclasses should override this
method to provide
- * specific operation.
- *
- * @return cleanup operation
- */
- protected CleanupOperation getCleanupOperation() {
- return CleanupOperation.NONE;
- }
-
- protected boolean shouldExecuteTask(
- TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
- if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
- return true;
- }
-
- long lastCleanupEndTime =
- ((DefaultTableRuntime)
tableRuntime).getLastCleanTime(cleanupOperation);
-
- // If it's zero, execute the task
- if (lastCleanupEndTime == 0L) {
- logger.debug(
- "LastCleanupTime for table {} with operation {} is not exist,
executing task",
- tableRuntime.getTableIdentifier().getTableName(),
- cleanupOperation);
- return true;
- }
-
- // After ams restarts, certain cleanup operations can only be re-executed
- // if sufficient time has elapsed since the last cleanup.
- boolean result = shouldExecute(lastCleanupEndTime);
- logger.debug(
- result
- ? "Should execute task for table {} with {}"
- : "Not enough time has passed since last cleanup for table {} with
{}, delaying execution",
- tableRuntime.getTableIdentifier().getTableName(),
- cleanupOperation);
-
- return result;
- }
-
- /**
- * Check if the operation should be skipped based on common conditions.
- *
- * @param tableRuntime the table runtime to check
- * @param cleanupOperation the cleanup operation to perform
- * @return true if the operation should be skipped, false otherwise
- */
- private boolean shouldSkipOperation(
- TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
- if (cleanupOperation == CleanupOperation.NONE) {
- logger.debug(
- "No cleanup operation specified, skipping cleanup time check for
table {}",
- tableRuntime.getTableIdentifier().getTableName());
- return true;
- }
-
- if (!(tableRuntime instanceof DefaultTableRuntime)) {
- logger.debug(
- "Table runtime is not DefaultTableRuntime, skipping cleanup time
check for table {}",
- tableRuntime.getTableIdentifier().getTableName());
- return true;
- }
-
- return false;
- }
-
protected String getThreadName() {
return String.join("-",
StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
.toLowerCase(Locale.ROOT);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
deleted file mode 100644
index c1788436d..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.maintainer.TableMaintainer;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ThreadLocalRandom;
-
-/** Service for expiring tables periodically. */
-public class SnapshotsExpiringExecutor extends PeriodicTableScheduler {
- private static final Logger LOG =
LoggerFactory.getLogger(SnapshotsExpiringExecutor.class);
-
- private final Duration interval;
-
- public SnapshotsExpiringExecutor(TableService tableService, int poolSize,
Duration interval) {
- super(tableService, poolSize);
- this.interval = interval;
- }
-
- @Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return interval.toMillis();
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return tableRuntime.getTableConfiguration().isExpireSnapshotEnabled();
- }
-
- @Override
- public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
- scheduleIfNecessary(tableRuntime, getStartDelay());
- }
-
- @Override
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
- }
-
- @Override
- protected CleanupOperation getCleanupOperation() {
- return CleanupOperation.SNAPSHOTS_EXPIRING;
- }
-
- @Override
- protected long getExecutorDelay() {
- return ThreadLocalRandom.current().nextLong(interval.toMillis());
- }
-
- @Override
- public void execute(TableRuntime tableRuntime) {
- try {
- AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
- tableMaintainer.expireSnapshots();
- } catch (Throwable t) {
- LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 7fd9c075a..e2608a1e6 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -37,7 +37,6 @@ import
org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.table.blocker.TableBlocker;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
@@ -339,32 +338,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
.commit();
}
- public long getLastCleanTime(CleanupOperation operation) {
- TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
- switch (operation) {
- case SNAPSHOTS_EXPIRING:
- return state.getLastSnapshotsExpiringTime();
- default:
- return 0L;
- }
- }
-
- public void updateLastCleanTime(CleanupOperation operation, long time) {
- store()
- .begin()
- .updateState(
- CLEANUP_STATE_KEY,
- state -> {
- switch (operation) {
- case SNAPSHOTS_EXPIRING:
- state.setLastSnapshotsExpiringTime(time);
- break;
- }
- return state;
- })
- .commit();
- }
-
public void completeProcess(boolean success) {
OptimizingStatus originalStatus = getOptimizingStatus();
OptimizingType processType = optimizingProcess.getOptimizingType();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
deleted file mode 100644
index 958329081..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.table.cleanup;
-
-/** Table cleanup operation enum. Defines different operation types for table
cleanup tasks. */
-public enum CleanupOperation {
- SNAPSHOTS_EXPIRING,
- // NONE indicates operation types where no cleanup process records are
- // saved in the table_runtime_state table.
- NONE;
-}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
index 532bc1888..40f88575d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
@@ -127,14 +127,6 @@ public class TestAmoroManagementConfValidator {
configurations.setInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT,
10);
AmoroManagementConfValidator.validateConfig(configurations);
- configurations.setBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED,
true);
-
configurations.setInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT,
-1);
- Assert.assertThrows(
- IllegalArgumentException.class,
- () -> AmoroManagementConfValidator.validateConfig(configurations));
-
configurations.setInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT,
10);
- AmoroManagementConfValidator.validateConfig(configurations);
-
configurations.setBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED,
true);
configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT,
-1);
Assert.assertThrows(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
deleted file mode 100644
index 70aed2fce..000000000
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-
-/**
- * Test table executor implementation for testing PeriodicTableScheduler
functionality. This class
- * allows configuration of cleanup operations and enabled state for testing
purposes.
- */
-class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler {
- private final CleanupOperation cleanupOperation;
- private final boolean enabled;
- private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; //
1 hour
-
- public PeriodicTableSchedulerTestBase(
- TableService tableService, CleanupOperation cleanupOperation, boolean
enabled) {
- super(tableService, 1);
- this.cleanupOperation = cleanupOperation;
- this.enabled = enabled;
- }
-
- @Override
- protected CleanupOperation getCleanupOperation() {
- return cleanupOperation;
- }
-
- @Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return 1000;
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return enabled;
- }
-
- @Override
- protected void execute(TableRuntime tableRuntime) {
- // Do nothing in test
- }
-
- @Override
- protected long getExecutorDelay() {
- return 0;
- }
-
- @Override
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- long currentTime = System.currentTimeMillis();
- switch (cleanupOperation) {
- case SNAPSHOTS_EXPIRING:
- return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
- default:
- return true;
- }
- }
-
- public boolean shouldExecuteTaskForTest(
- TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
- return shouldExecuteTask(tableRuntime, cleanupOperation);
- }
-}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
index 6c8e05142..4a7c2895d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -25,29 +25,9 @@ import org.mockito.Mockito;
import java.time.Duration;
-/** Tests for configurable interval in SnapshotsExpiringExecutor and
ProcessDataExpiringExecutor. */
+/** Tests for configurable interval in ProcessDataExpiringExecutor. */
public class TestConfigurableIntervalExecutors {
- @Test
- public void testSnapshotsExpiringDefaultInterval() {
- Duration interval = Duration.ofHours(1);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofHours(1).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testSnapshotsExpiringCustomInterval() {
- Duration interval = Duration.ofMinutes(30);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofMinutes(30).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
@Test
public void testProcessDataExpiringDefaultInterval() {
Duration optimizingKeepTime = Duration.ofDays(30);
@@ -73,16 +53,4 @@ public class TestConfigurableIntervalExecutors {
Assert.assertEquals(
Duration.ofMinutes(30).toMillis(),
executor.getNextExecutingTime(tableRuntime));
}
-
- @Test
- public void testSnapshotsExpiringShouldExecuteAfterInterval() {
- Duration interval = Duration.ofHours(2);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- long now = System.currentTimeMillis();
- // 3 hours ago - should execute
- Assert.assertTrue(executor.shouldExecute(now -
Duration.ofHours(3).toMillis()));
- // 1 hour ago - should not execute
- Assert.assertFalse(executor.shouldExecute(now -
Duration.ofHours(1).toMillis()));
- }
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
deleted file mode 100644
index 40939505d..000000000
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.TableFormat;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.TableRuntimeMeta;
-import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
-import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.DefaultTableRuntimeStore;
-import org.apache.amoro.server.table.TableRuntimeHandler;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.apache.amoro.table.TableRuntimeStore;
-import org.apache.amoro.table.TableSummary;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This class tests all aspects of cleanup operation handling in {@link
- * org.apache.amoro.server.scheduler.PeriodicTableScheduler}.
- */
-public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
-
- private static final String TEST_CATALOG = "test_catalog";
- private static final String TEST_DB = "test_db";
- private static final String TEST_TABLE = "test_table";
-
- static {
- try {
- Class.forName("org.apache.amoro.server.table.DerbyPersistence");
- } catch (Exception e) {
- throw new RuntimeException("Failed to initialize Derby persistence", e);
- }
- }
-
- private static final TableRuntimeHandler TEST_HANDLER =
- new TableRuntimeHandler() {
- @Override
- public void handleTableChanged(
- TableRuntime tableRuntime,
- org.apache.amoro.server.optimizing.OptimizingStatus
originalStatus) {}
-
- @Override
- public void handleTableChanged(
- TableRuntime tableRuntime, TableConfiguration originalConfig) {}
- };
-
- /**
- * Create a test server table identifier with the given ID
- *
- * @param tableId the table ID
- * @return a ServerTableIdentifier instance
- */
- private ServerTableIdentifier createTableIdentifier(long tableId) {
- return ServerTableIdentifier.of(
- tableId, TEST_CATALOG, TEST_DB, TEST_TABLE + "_" + tableId,
TableFormat.ICEBERG);
- }
-
- /**
- * Create a test DefaultTableRuntime with the given identifier
- *
- * @param identifier the table identifier
- * @return a DefaultTableRuntime instance
- */
- private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier
identifier) {
- // Create table runtime meta
- TableRuntimeMeta meta = new TableRuntimeMeta();
- meta.setTableId(identifier.getId());
- meta.setGroupName("test_group");
- meta.setStatusCode(0);
- meta.setTableConfig(Collections.emptyMap());
- meta.setTableSummary(new TableSummary());
-
- // Create table runtime store
- TableRuntimeStore store =
- new DefaultTableRuntimeStore(
- identifier, meta, DefaultTableRuntime.REQUIRED_STATES,
Collections.emptyList());
-
- return new DefaultTableRuntime(store, () -> null);
- }
-
- private void cleanUpTableRuntimeData(List<Long> tableIds) {
- doAs(
- TableRuntimeMapper.class,
- mapper -> {
- for (Long tableId : tableIds) {
- try {
- mapper.deleteRuntime(tableId);
- mapper.removeAllTableStates(tableId);
- } catch (Exception e) {
- // Ignore if tables don't exist
- }
- }
- });
- doAs(
- TableMetaMapper.class,
- mapper -> {
- for (Long tableId : tableIds) {
- try {
- mapper.deleteTableIdById(tableId);
- } catch (Exception e) {
- // Ignore if tables don't exist
- }
- }
- });
- }
-
- /**
- * Prepare test environment by cleaning up test data and table runtime data
- *
- * @param testTableIds list of table IDs to clean up
- */
- private void prepareTestEnvironment(List<Long> testTableIds) {
- cleanUpTableRuntimeData(testTableIds);
- }
-
- /**
- * Create a test table executor
- *
- * @param cleanupOperation the cleanup operation to use
- * @param enabled whether the executor should be enabled
- * @return a new PeriodicTableSchedulerTestBase instance
- */
- private PeriodicTableSchedulerTestBase createTestExecutor(
- CleanupOperation cleanupOperation, boolean enabled) {
- return new PeriodicTableSchedulerTestBase(null, cleanupOperation, enabled);
- }
-
- /**
- * Create a test table executor with default enabled state (true)
- *
- * @param cleanupOperation the cleanup operation to use
- * @return a new PeriodicTableSchedulerTestBase instance
- */
- private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation
cleanupOperation) {
- return createTestExecutor(cleanupOperation, true);
- }
-
- /**
- * Test whether the executor should execute a task for a given table runtime
and cleanup operation
- */
- @Test
- public void testShouldExecuteTaskWithNoPreviousCleanup() {
- List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
-
- for (CleanupOperation operation : operations) {
- List<Long> testTableIds = Collections.singletonList(1L);
- prepareTestEnvironment(testTableIds);
-
- PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
- ServerTableIdentifier identifier = createTableIdentifier(1L);
- DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
-
- boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
- Assert.assertTrue(
- "Should execute when there's no previous cleanup time for operation
" + operation,
- shouldExecute);
- }
- }
-
- /** Test should not execute task with recent cleanup */
- @Test
- public void testShouldNotExecuteTaskWithRecentCleanup() {
- List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
-
- for (CleanupOperation operation : operations) {
- List<Long> testTableIds = Collections.singletonList(1L);
- cleanUpTableRuntimeData(testTableIds);
-
- PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
-
- // Create DefaultTableRuntime and set recent cleanup time
- ServerTableIdentifier identifier = createTableIdentifier(1L);
- DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
-
- // Simulate recent cleanup
- long recentTime = System.currentTimeMillis() - 10000L;
- tableRuntime.updateLastCleanTime(operation, recentTime);
-
- boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
- Assert.assertFalse(
- "Should not execute when recently cleaned up for operation " +
operation, shouldExecute);
- }
- }
-
- /** Test should execute task with old cleanup */
- @Test
- public void testShouldExecuteTaskWithOldCleanup() {
- List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
-
- for (CleanupOperation operation : operations) {
- List<Long> testTableIds = Collections.singletonList(1L);
- cleanUpTableRuntimeData(testTableIds);
-
- PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
-
- // Create DefaultTableRuntime and set old cleanup time
- ServerTableIdentifier identifier = createTableIdentifier(1L);
- DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
-
- // Simulate old cleanup time (30 hours ago)
- long oldTime = System.currentTimeMillis() - 30 * 60 * 60 * 1000L;
- tableRuntime.updateLastCleanTime(operation, oldTime);
-
- boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
operation);
- Assert.assertTrue(
- "Should execute when enough time has passed since last cleanup for
operation "
- + operation,
- shouldExecute);
- }
- }
-
- @Test
- public void testShouldExecuteTaskWithNoneOperation() {
- List<Long> testTableIds = Collections.singletonList(1L);
- prepareTestEnvironment(testTableIds);
-
- PeriodicTableSchedulerTestBase executor =
createTestExecutor(CleanupOperation.NONE);
- ServerTableIdentifier identifier = createTableIdentifier(1L);
- DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier);
-
- // Should always execute with NONE operation
- boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime,
CleanupOperation.NONE);
- Assert.assertTrue("Should always execute with NONE operation",
shouldExecute);
- }
-}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/utils/JacksonUtilTest.java
b/amoro-common/src/test/java/org/apache/amoro/utils/JacksonUtilTest.java
index d9e559a7a..435b0f86e 100644
--- a/amoro-common/src/test/java/org/apache/amoro/utils/JacksonUtilTest.java
+++ b/amoro-common/src/test/java/org/apache/amoro/utils/JacksonUtilTest.java
@@ -37,7 +37,6 @@ public class JacksonUtilTest {
@Test
public void jacksonTest() {
-
JsonTestBean testObject = new JsonTestBean();
testObject.setBoolValue(true);
testObject.setIntValue(3);
@@ -85,7 +84,7 @@ public class JacksonUtilTest {
Map<String, Object> expireSnapshots = new HashMap<>();
expireSnapshots.put("enabled", true);
- expireSnapshots.put("thread-count", 10);
+ expireSnapshots.put("interval", "1h");
yamlMap.put("expire-snapshots", expireSnapshots);
List<Map<String, Object>> containersList = new ArrayList<>();
@@ -114,8 +113,7 @@ public class JacksonUtilTest {
JsonNode expiredSnapshots = yamlNode.get("expire-snapshots");
assertTrue(JacksonUtil.getBoolean(expiredSnapshots, "enabled"));
- Integer expectedThreadCount = 10;
- assertEquals(expectedThreadCount, JacksonUtil.getInteger(expiredSnapshots,
"thread-count"));
+ assertEquals("1h", JacksonUtil.getString(expiredSnapshots, "interval"));
JsonNode containersNode = yamlNode.get("containers");
assertEquals(2, containersNode.size());
JsonNode secondNode = containersNode.get(1);
diff --git a/charts/amoro/templates/amoro-configmap.yaml
b/charts/amoro/templates/amoro-configmap.yaml
index 0bfbfaf91..7fcde4974 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -99,10 +99,6 @@ data:
timeout: 1min # 60000
# optional features
- expire-snapshots:
- enabled: true
- thread-count: 10
-
sync-hive-tables:
enabled: false
thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index 866eda219..4e6b66e8c 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -108,10 +108,6 @@ ams:
~
# optional features
- expire-snapshots:
- enabled: true
- thread-count: 10
-
sync-hive-tables:
enabled: false
thread-count: 10
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 0a12a0038..d03d149cf 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -58,9 +58,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| database.type | derby | Database type. |
| database.url | jdbc:derby:/tmp/amoro/derby;create=true | Database connection
address |
| database.username | root | The username for connecting to the database. |
-| expire-snapshots.enabled | true | Enable snapshots expiring. |
-| expire-snapshots.interval | 1 h | Interval for expiring snapshots. |
-| expire-snapshots.thread-count | 10 | The number of threads used for
snapshots expiring. |
| ha.bucket-assign.interval | 1 min | Interval for bucket assignment service
to detect node changes and redistribute bucket IDs. |
| ha.bucket-id.total-count | 100 | Total count of bucket IDs for assignment.
Bucket IDs range from 1 to this value. |
| ha.bucket-table-sync.interval | 1 min | Interval for syncing tables assigned
to bucket IDs in master-slave mode. Each node periodically loads tables from
database based on its assigned bucket IDs. |