This is an automated email from the ASF dual-hosted git repository. czy006 pushed a commit to branch 0.9.x in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 36a5a9ced0f541e05a6c3ae747309dfddbef9705 Author: Darcy <[email protected]> AuthorDate: Tue May 19 11:13:16 2026 +0800 [AMORO-4223][ams] Fix AMS startup crash when recovering Iceberg maintenance processes (#4224) IcebergProcessFactory.recover() was a stub that unconditionally threw RecoverProcessFailedException. Since ProcessService.recoverProcesses() had no per-record error handling, any SUBMITTED/RUNNING Iceberg expire-snapshots or clean-orphan-files record made AMS fail to start and enter a crash loop with no automatic recovery. Changes: - Implement IcebergProcessFactory.recover(): dispatch by the persisted action and rebuild SnapshotsExpiringProcess / OrphanFilesCleaningProcess bound to the local engine. Both are stateless, idempotent one-shot local maintenance tasks, so re-running them on recovery is safe. - Harden ProcessService.recoverProcesses(): recover each record in its own try/catch; on failure log it, mark the record FAILED and skip it so a single un-recoverable record can no longer abort the whole AMS startup. - Add Action.toString() returning the action name so diagnostic logs print a readable name instead of org.apache.amoro.Action@hash. - Add unit tests for IcebergProcessFactory.recover() and Action.toString(), and a regression test covering the recoverProcesses() fail-safe path. Co-authored-by: lintingbin <[email protected]> Co-authored-by: ZhouJinsong <[email protected]> (cherry picked from commit 1d4bc38be54604ed2b73c7632864c498a9621d02) --- .../amoro/server/process/ProcessService.java | 73 +++++++++++++++++++--- .../process/iceberg/IcebergProcessFactory.java | 19 +++++- .../amoro/server/TestDefaultProcessService.java | 47 ++++++++++++++ .../process/ThrowingRecoverActionCoordinator.java | 43 +++++++++++++ .../process/iceberg/TestIcebergProcessFactory.java | 71 +++++++++++++++++++++ .../src/main/java/org/apache/amoro/Action.java | 5 ++ .../src/test/java/org/apache/amoro/TestAction.java | 38 +++++++++++ 7 files changed, 285 insertions(+), 11 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index c3cac4efe..68446db8d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -156,20 +156,73 @@ public class ProcessService extends PersistentBase { ActionCoordinatorScheduler scheduler = actionCoordinators.get(processMeta.getProcessType()); if (tableRuntime != null && scheduler != null) { - DefaultTableProcessStore store = - new DefaultTableProcessStore( - processMeta.getProcessId(), - tableRuntime, - processMeta, - scheduler.getAction(), - processMeta.getRetryNumber()); - TableProcess process = scheduler.recover(tableRuntime, store); - trackTableProcess(tableRuntime.getTableIdentifier(), store, process); - executeOrTraceProcess(store, process); + recoverProcess(tableRuntime, scheduler, processMeta); } }); } + /** + * Recover a single persisted process record. Any failure is contained here: the offending record + * is marked {@link ProcessStatus#FAILED} and skipped, so that one un-recoverable process record + * cannot abort the whole AMS startup (see AMORO-4223). The affected maintenance action will be + * re-scheduled by its periodic scheduler. + * + * @param tableRuntime table runtime + * @param scheduler coordinator scheduler for the process type + * @param processMeta persisted process metadata + */ + private void recoverProcess( + TableRuntime tableRuntime, + ActionCoordinatorScheduler scheduler, + TableProcessMeta processMeta) { + DefaultTableProcessStore store = + new DefaultTableProcessStore( + processMeta.getProcessId(), + tableRuntime, + processMeta, + scheduler.getAction(), + processMeta.getRetryNumber()); + try { + TableProcess process = scheduler.recover(tableRuntime, store); + trackTableProcess(tableRuntime.getTableIdentifier(), store, process); + executeOrTraceProcess(store, process); + } catch (Throwable t) { + LOG.error( + "Failed to recover table process {} (action {}) for table {}, marking it FAILED " + + "and skipping so AMS can continue to start up.", + processMeta.getProcessId(), + scheduler.getAction(), + tableRuntime.getTableIdentifier(), + t); + markRecoverFailed(store, t); + } + } + + /** + * Best-effort mark an un-recoverable process as {@link ProcessStatus#FAILED} so it is not picked + * up again on the next AMS restart. Never throws. + * + * @param store process store + * @param cause the recovery failure + */ + private void markRecoverFailed(DefaultTableProcessStore store, Throwable cause) { + try { + store.tryTransitState( + ProcessStatus.FAILED, + ProcessEvent.COMPLETE_FAILED, + store.getExternalProcessIdentifier(), + "Failed to recover process on AMS startup: " + cause.getMessage(), + store.getProcessParameters(), + store.getSummary()); + } catch (Throwable t) { + LOG.error( + "Failed to mark un-recoverable table process {} as FAILED; it may be retried on the " + + "next AMS restart.", + store.getProcessId(), + t); + } + } + /** * Execute the process or trace it if not executable. * diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index 813892595..db3ed8c74 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -119,8 +119,25 @@ public class IcebergProcessFactory implements ProcessFactory { @Override public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) throws RecoverProcessFailedException { + Action action = store.getAction(); + if (localEngine == null) { + throw new RecoverProcessFailedException( + "Local execution engine is not available for IcebergProcessFactory, " + + "cannot recover action: " + + action); + } + + // SnapshotsExpiringProcess and OrphanFilesCleaningProcess are stateless, idempotent + // one-shot local maintenance tasks (no checkpoint), so recovery simply rebuilds the + // process so it can run again. The store/processId/tracking is owned by ProcessService. + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { + return new SnapshotsExpiringProcess(tableRuntime, localEngine); + } else if (IcebergActions.DELETE_ORPHANS.equals(action)) { + return new OrphanFilesCleaningProcess(tableRuntime, localEngine); + } + throw new RecoverProcessFailedException( - "Unsupported action for IcebergProcessFactory: " + store.getAction()); + "Unsupported action for IcebergProcessFactory: " + action); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java index 5b719aa73..c49bfe348 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java @@ -29,6 +29,7 @@ import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.process.MockActionCoordinator; import org.apache.amoro.server.process.MockExecuteEngine; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.ThrowingRecoverActionCoordinator; import org.apache.amoro.server.table.AMSTableTestBase; import org.junit.After; import org.junit.Assert; @@ -218,6 +219,52 @@ public class TestDefaultProcessService extends AMSTableTestBase { } } + /** + * Verify that a single un-recoverable process record does not abort AMS startup: {@code + * recoverProcesses} must not propagate the failure, the bad record is skipped and persisted as + * FAILED so a later restart neither throws nor re-picks it. Regression test for AMORO-4223. + */ + @Test(timeout = 60_000) + public void testRecoverProcessFailSafe() { + MockExecuteEngine executeEngine = getExecuteEngine(); + try { + createTable(); + + awaitActiveInstances(executeEngine); + + ProcessService.TableProcessHolder holder = getAnyActiveTableProcessHolder(); + TableProcessStore store = holder.getStore(); + org.apache.amoro.TableRuntime tableRuntime = holder.getProcess().getTableRuntime(); + + awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(), ProcessStatus.RUNNING); + Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus()); + + // Simulate an AMS restart where the process can no longer be recovered (the exact + // condition that bricked AMS in AMORO-4223): stop tracking the live instance, then + // swap in a coordinator whose recover() always fails. + processServiceService() + .untrackTableProcessInstance(tableRuntime.getTableIdentifier(), store.getProcessId()); + processServiceService().unInstallAllActionCoordinators(); + processServiceService() + .installActionCoordinator(new ThrowingRecoverActionCoordinator(executeEngine)); + + // Must NOT throw: the un-recoverable record is contained and AMS keeps starting up. + processServiceService() + .recoverProcesses(new ArrayList<>(Collections.singletonList(tableRuntime))); + Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty()); + + // The bad record is now persisted as FAILED, so it is no longer "active": a subsequent + // restart neither throws nor re-picks it. + processServiceService() + .recoverProcesses(new ArrayList<>(Collections.singletonList(tableRuntime))); + Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty()); + + dropTable(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + // ---------------------- Private helpers ---------------------- /** Return the first available execute engine and validate its presence. */ diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java new file mode 100644 index 000000000..965bb3c2a --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; + +/** + * Mock coordinator whose {@code recoverTableProcess} always fails, used to verify that a single + * un-recoverable process record cannot abort the whole AMS startup (see AMORO-4223). + */ +public class ThrowingRecoverActionCoordinator extends MockActionCoordinator { + + public ThrowingRecoverActionCoordinator(ExecuteEngine executeEngine) { + super(executeEngine); + } + + @Override + public TableProcess recoverTableProcess( + TableRuntime tableRuntime, TableProcessStore processStore) { + throw new RecoverProcessFailedException( + "Unsupported action for test coordinator: " + processStore.getAction()); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index 825410296..9c429803f 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -21,12 +21,16 @@ package org.apache.amoro.server.process.iceberg; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import org.apache.amoro.Action; import org.apache.amoro.IcebergActions; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.process.LocalExecutionEngine; import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState; import org.junit.Assert; @@ -83,6 +87,73 @@ public class TestIcebergProcessFactory { "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, false, 0); } + @Test + public void testRecoverExpireSnapshotsProcess() { + assertRecover( + "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, SnapshotsExpiringProcess.class); + } + + @Test + public void testRecoverOrphanFilesCleaningProcess() { + assertRecover( + "clean-orphan-files", IcebergActions.DELETE_ORPHANS, OrphanFilesCleaningProcess.class); + } + + @Test + public void testRecoverUnsupportedActionThrows() { + IcebergProcessFactory factory = openedFactory("expire-snapshots"); + + LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class); + doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name(); + factory.availableExecuteEngines(Arrays.asList(localEngine)); + + TableProcessStore store = mock(TableProcessStore.class); + doReturn(IcebergActions.REWRITE).when(store).getAction(); + + Assert.assertThrows( + RecoverProcessFailedException.class, + () -> factory.recover(mock(TableRuntime.class), store)); + } + + @Test + public void testRecoverWithoutLocalEngineThrows() { + IcebergProcessFactory factory = openedFactory("expire-snapshots"); + + TableProcessStore store = mock(TableProcessStore.class); + doReturn(IcebergActions.EXPIRE_SNAPSHOTS).when(store).getAction(); + + Assert.assertThrows( + RecoverProcessFailedException.class, + () -> factory.recover(mock(TableRuntime.class), store)); + } + + private void assertRecover(String configKey, Action action, Class<?> processClass) { + IcebergProcessFactory factory = openedFactory(configKey); + + LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class); + doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name(); + factory.availableExecuteEngines(Arrays.asList(localEngine)); + + TableProcessStore store = mock(TableProcessStore.class); + doReturn(action).when(store).getAction(); + + TableProcess process = factory.recover(mock(TableRuntime.class), store); + + Assert.assertNotNull(process); + Assert.assertTrue(processClass.isInstance(process)); + Assert.assertEquals(action, process.getAction()); + Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, process.getExecutionEngine()); + } + + private IcebergProcessFactory openedFactory(String configKey) { + IcebergProcessFactory factory = new IcebergProcessFactory(); + Map<String, String> properties = new HashMap<>(); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); + factory.open(properties); + return factory; + } + private void assertSupportedAction( String configKey, org.apache.amoro.Action action, Duration interval) { IcebergProcessFactory factory = new IcebergProcessFactory(); diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java b/amoro-common/src/main/java/org/apache/amoro/Action.java index 826006a31..11b53d0e2 100644 --- a/amoro-common/src/main/java/org/apache/amoro/Action.java +++ b/amoro-common/src/main/java/org/apache/amoro/Action.java @@ -69,4 +69,9 @@ public final class Action { public int hashCode() { return Objects.hash(name); } + + @Override + public String toString() { + return name; + } } diff --git a/amoro-common/src/test/java/org/apache/amoro/TestAction.java b/amoro-common/src/test/java/org/apache/amoro/TestAction.java new file mode 100644 index 000000000..a7a4d739d --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/TestAction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +public class TestAction { + + /** + * {@link Action#toString()} must return the action name so that diagnostic logs print a readable + * name instead of {@code org.apache.amoro.Action@hash} (see AMORO-4223). + */ + @Test + public void testToStringReturnsName() { + Action action = Action.register("expire-snapshots"); + assertEquals("EXPIRE-SNAPSHOTS", action.toString()); + assertEquals(action.getName(), action.toString()); + assertEquals("recover action: EXPIRE-SNAPSHOTS", "recover action: " + action); + } +}
