This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 1d4bc38be [AMORO-4223][ams] Fix AMS startup crash when recovering
Iceberg maintenance processes (#4224)
1d4bc38be is described below
commit 1d4bc38be54604ed2b73c7632864c498a9621d02
Author: Darcy <[email protected]>
AuthorDate: Tue May 19 11:13:16 2026 +0800
[AMORO-4223][ams] Fix AMS startup crash when recovering Iceberg maintenance
processes (#4224)
IcebergProcessFactory.recover() was a stub that unconditionally threw
RecoverProcessFailedException. Since ProcessService.recoverProcesses()
had no per-record error handling, any SUBMITTED/RUNNING Iceberg
expire-snapshots or clean-orphan-files record made AMS fail to start and
enter a crash loop with no automatic recovery.
Changes:
- Implement IcebergProcessFactory.recover(): dispatch by the persisted
action and rebuild SnapshotsExpiringProcess / OrphanFilesCleaningProcess
bound to the local engine. Both are stateless, idempotent one-shot local
maintenance tasks, so re-running them on recovery is safe.
- Harden ProcessService.recoverProcesses(): recover each record in its own
try/catch; on failure log it, mark the record FAILED and skip it so a
single un-recoverable record can no longer abort the whole AMS startup.
- Add Action.toString() returning the action name so diagnostic logs print
a readable name instead of org.apache.amoro.Action@hash.
- Add unit tests for IcebergProcessFactory.recover() and Action.toString(),
and a regression test covering the recoverProcesses() fail-safe path.
Co-authored-by: lintingbin <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../amoro/server/process/ProcessService.java | 73 +++++++++++++++++++---
.../process/iceberg/IcebergProcessFactory.java | 19 +++++-
.../amoro/server/TestDefaultProcessService.java | 47 ++++++++++++++
.../process/ThrowingRecoverActionCoordinator.java | 43 +++++++++++++
.../process/iceberg/TestIcebergProcessFactory.java | 71 +++++++++++++++++++++
.../src/main/java/org/apache/amoro/Action.java | 5 ++
.../src/test/java/org/apache/amoro/TestAction.java | 38 +++++++++++
7 files changed, 285 insertions(+), 11 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index c3cac4efe..68446db8d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -156,20 +156,73 @@ public class ProcessService extends PersistentBase {
ActionCoordinatorScheduler scheduler =
actionCoordinators.get(processMeta.getProcessType());
if (tableRuntime != null && scheduler != null) {
- DefaultTableProcessStore store =
- new DefaultTableProcessStore(
- processMeta.getProcessId(),
- tableRuntime,
- processMeta,
- scheduler.getAction(),
- processMeta.getRetryNumber());
- TableProcess process = scheduler.recover(tableRuntime, store);
- trackTableProcess(tableRuntime.getTableIdentifier(), store,
process);
- executeOrTraceProcess(store, process);
+ recoverProcess(tableRuntime, scheduler, processMeta);
}
});
}
+ /**
+ * Recover a single persisted process record. Any failure is contained here:
the offending record
+ * is marked {@link ProcessStatus#FAILED} and skipped, so that one
un-recoverable process record
+ * cannot abort the whole AMS startup (see AMORO-4223). The affected
maintenance action will be
+ * re-scheduled by its periodic scheduler.
+ *
+ * @param tableRuntime table runtime
+ * @param scheduler coordinator scheduler for the process type
+ * @param processMeta persisted process metadata
+ */
+ private void recoverProcess(
+ TableRuntime tableRuntime,
+ ActionCoordinatorScheduler scheduler,
+ TableProcessMeta processMeta) {
+ DefaultTableProcessStore store =
+ new DefaultTableProcessStore(
+ processMeta.getProcessId(),
+ tableRuntime,
+ processMeta,
+ scheduler.getAction(),
+ processMeta.getRetryNumber());
+ try {
+ TableProcess process = scheduler.recover(tableRuntime, store);
+ trackTableProcess(tableRuntime.getTableIdentifier(), store, process);
+ executeOrTraceProcess(store, process);
+ } catch (Throwable t) {
+ LOG.error(
+ "Failed to recover table process {} (action {}) for table {},
marking it FAILED "
+ + "and skipping so AMS can continue to start up.",
+ processMeta.getProcessId(),
+ scheduler.getAction(),
+ tableRuntime.getTableIdentifier(),
+ t);
+ markRecoverFailed(store, t);
+ }
+ }
+
+ /**
+ * Best-effort mark an un-recoverable process as {@link
ProcessStatus#FAILED} so it is not picked
+ * up again on the next AMS restart. Never throws.
+ *
+ * @param store process store
+ * @param cause the recovery failure
+ */
+ private void markRecoverFailed(DefaultTableProcessStore store, Throwable
cause) {
+ try {
+ store.tryTransitState(
+ ProcessStatus.FAILED,
+ ProcessEvent.COMPLETE_FAILED,
+ store.getExternalProcessIdentifier(),
+ "Failed to recover process on AMS startup: " + cause.getMessage(),
+ store.getProcessParameters(),
+ store.getSummary());
+ } catch (Throwable t) {
+ LOG.error(
+ "Failed to mark un-recoverable table process {} as FAILED; it may be
retried on the "
+ + "next AMS restart.",
+ store.getProcessId(),
+ t);
+ }
+ }
+
/**
* Execute the process or trace it if not executable.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 813892595..db3ed8c74 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -119,8 +119,25 @@ public class IcebergProcessFactory implements
ProcessFactory {
@Override
public TableProcess recover(TableRuntime tableRuntime, TableProcessStore
store)
throws RecoverProcessFailedException {
+ Action action = store.getAction();
+ if (localEngine == null) {
+ throw new RecoverProcessFailedException(
+ "Local execution engine is not available for IcebergProcessFactory, "
+ + "cannot recover action: "
+ + action);
+ }
+
+ // SnapshotsExpiringProcess and OrphanFilesCleaningProcess are stateless,
idempotent
+ // one-shot local maintenance tasks (no checkpoint), so recovery simply
rebuilds the
+ // process so it can run again. The store/processId/tracking is owned by
ProcessService.
+ if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
+ return new SnapshotsExpiringProcess(tableRuntime, localEngine);
+ } else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
+ return new OrphanFilesCleaningProcess(tableRuntime, localEngine);
+ }
+
throw new RecoverProcessFailedException(
- "Unsupported action for IcebergProcessFactory: " + store.getAction());
+ "Unsupported action for IcebergProcessFactory: " + action);
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
index 5b719aa73..c49bfe348 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
@@ -29,6 +29,7 @@ import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.process.MockActionCoordinator;
import org.apache.amoro.server.process.MockExecuteEngine;
import org.apache.amoro.server.process.ProcessService;
+import org.apache.amoro.server.process.ThrowingRecoverActionCoordinator;
import org.apache.amoro.server.table.AMSTableTestBase;
import org.junit.After;
import org.junit.Assert;
@@ -218,6 +219,52 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
}
}
+ /**
+ * Verify that a single un-recoverable process record does not abort AMS
startup: {@code
+ * recoverProcesses} must not propagate the failure, the bad record is
skipped and persisted as
+ * FAILED so a later restart neither throws nor re-picks it. Regression test
for AMORO-4223.
+ */
+ @Test(timeout = 60_000)
+ public void testRecoverProcessFailSafe() {
+ MockExecuteEngine executeEngine = getExecuteEngine();
+ try {
+ createTable();
+
+ awaitActiveInstances(executeEngine);
+
+ ProcessService.TableProcessHolder holder =
getAnyActiveTableProcessHolder();
+ TableProcessStore store = holder.getStore();
+ org.apache.amoro.TableRuntime tableRuntime =
holder.getProcess().getTableRuntime();
+
+ awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(),
ProcessStatus.RUNNING);
+ Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
+
+ // Simulate an AMS restart where the process can no longer be recovered
(the exact
+ // condition that bricked AMS in AMORO-4223): stop tracking the live
instance, then
+ // swap in a coordinator whose recover() always fails.
+ processServiceService()
+ .untrackTableProcessInstance(tableRuntime.getTableIdentifier(),
store.getProcessId());
+ processServiceService().unInstallAllActionCoordinators();
+ processServiceService()
+ .installActionCoordinator(new
ThrowingRecoverActionCoordinator(executeEngine));
+
+ // Must NOT throw: the un-recoverable record is contained and AMS keeps
starting up.
+ processServiceService()
+ .recoverProcesses(new
ArrayList<>(Collections.singletonList(tableRuntime)));
+
Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty());
+
+ // The bad record is now persisted as FAILED, so it is no longer
"active": a subsequent
+ // restart neither throws nor re-picks it.
+ processServiceService()
+ .recoverProcesses(new
ArrayList<>(Collections.singletonList(tableRuntime)));
+
Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty());
+
+ dropTable();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
// ---------------------- Private helpers ----------------------
/** Return the first available execute engine and validate its presence. */
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
new file mode 100644
index 000000000..965bb3c2a
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process;
+
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+
+/**
+ * Mock coordinator whose {@code recoverTableProcess} always fails, used to
verify that a single
+ * un-recoverable process record cannot abort the whole AMS startup (see
AMORO-4223).
+ */
+public class ThrowingRecoverActionCoordinator extends MockActionCoordinator {
+
+ public ThrowingRecoverActionCoordinator(ExecuteEngine executeEngine) {
+ super(executeEngine);
+ }
+
+ @Override
+ public TableProcess recoverTableProcess(
+ TableRuntime tableRuntime, TableProcessStore processStore) {
+ throw new RecoverProcessFailedException(
+ "Unsupported action for test coordinator: " +
processStore.getAction());
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 825410296..9c429803f 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -21,12 +21,16 @@ package org.apache.amoro.server.process.iceberg;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import org.apache.amoro.Action;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
import org.junit.Assert;
@@ -83,6 +87,73 @@ public class TestIcebergProcessFactory {
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
false, 0);
}
+ @Test
+ public void testRecoverExpireSnapshotsProcess() {
+ assertRecover(
+ "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
SnapshotsExpiringProcess.class);
+ }
+
+ @Test
+ public void testRecoverOrphanFilesCleaningProcess() {
+ assertRecover(
+ "clean-orphan-files", IcebergActions.DELETE_ORPHANS,
OrphanFilesCleaningProcess.class);
+ }
+
+ @Test
+ public void testRecoverUnsupportedActionThrows() {
+ IcebergProcessFactory factory = openedFactory("expire-snapshots");
+
+ LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+ doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+ factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+ TableProcessStore store = mock(TableProcessStore.class);
+ doReturn(IcebergActions.REWRITE).when(store).getAction();
+
+ Assert.assertThrows(
+ RecoverProcessFailedException.class,
+ () -> factory.recover(mock(TableRuntime.class), store));
+ }
+
+ @Test
+ public void testRecoverWithoutLocalEngineThrows() {
+ IcebergProcessFactory factory = openedFactory("expire-snapshots");
+
+ TableProcessStore store = mock(TableProcessStore.class);
+ doReturn(IcebergActions.EXPIRE_SNAPSHOTS).when(store).getAction();
+
+ Assert.assertThrows(
+ RecoverProcessFailedException.class,
+ () -> factory.recover(mock(TableRuntime.class), store));
+ }
+
+ private void assertRecover(String configKey, Action action, Class<?>
processClass) {
+ IcebergProcessFactory factory = openedFactory(configKey);
+
+ LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+ doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+ factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+ TableProcessStore store = mock(TableProcessStore.class);
+ doReturn(action).when(store).getAction();
+
+ TableProcess process = factory.recover(mock(TableRuntime.class), store);
+
+ Assert.assertNotNull(process);
+ Assert.assertTrue(processClass.isInstance(process));
+ Assert.assertEquals(action, process.getAction());
+ Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME,
process.getExecutionEngine());
+ }
+
+ private IcebergProcessFactory openedFactory(String configKey) {
+ IcebergProcessFactory factory = new IcebergProcessFactory();
+ Map<String, String> properties = new HashMap<>();
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
+ factory.open(properties);
+ return factory;
+ }
+
private void assertSupportedAction(
String configKey, org.apache.amoro.Action action, Duration interval) {
IcebergProcessFactory factory = new IcebergProcessFactory();
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 826006a31..11b53d0e2 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -69,4 +69,9 @@ public final class Action {
public int hashCode() {
return Objects.hash(name);
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/amoro-common/src/test/java/org/apache/amoro/TestAction.java
b/amoro-common/src/test/java/org/apache/amoro/TestAction.java
new file mode 100644
index 000000000..a7a4d739d
--- /dev/null
+++ b/amoro-common/src/test/java/org/apache/amoro/TestAction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+public class TestAction {
+
+ /**
+ * {@link Action#toString()} must return the action name so that diagnostic
logs print a readable
+ * name instead of {@code org.apache.amoro.Action@hash} (see AMORO-4223).
+ */
+ @Test
+ public void testToStringReturnsName() {
+ Action action = Action.register("expire-snapshots");
+ assertEquals("EXPIRE-SNAPSHOTS", action.toString());
+ assertEquals(action.getName(), action.toString());
+ assertEquals("recover action: EXPIRE-SNAPSHOTS", "recover action: " +
action);
+ }
+}