This is an automated email from the ASF dual-hosted git repository.

zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d4bc38be [AMORO-4223][ams] Fix AMS startup crash when recovering 
Iceberg maintenance processes (#4224)
1d4bc38be is described below

commit 1d4bc38be54604ed2b73c7632864c498a9621d02
Author: Darcy <[email protected]>
AuthorDate: Tue May 19 11:13:16 2026 +0800

    [AMORO-4223][ams] Fix AMS startup crash when recovering Iceberg maintenance 
processes (#4224)
    
    IcebergProcessFactory.recover() was a stub that unconditionally threw
    RecoverProcessFailedException. Since ProcessService.recoverProcesses()
    had no per-record error handling, any SUBMITTED/RUNNING Iceberg
    expire-snapshots or clean-orphan-files record made AMS fail to start and
    enter a crash loop with no automatic recovery.
    
    Changes:
    - Implement IcebergProcessFactory.recover(): dispatch by the persisted
      action and rebuild SnapshotsExpiringProcess / OrphanFilesCleaningProcess
      bound to the local engine. Both are stateless, idempotent one-shot local
      maintenance tasks, so re-running them on recovery is safe.
    - Harden ProcessService.recoverProcesses(): recover each record in its own
      try/catch; on failure log it, mark the record FAILED and skip it so a
      single un-recoverable record can no longer abort the whole AMS startup.
    - Add Action.toString() returning the action name so diagnostic logs print
      a readable name instead of org.apache.amoro.Action@hash.
    - Add unit tests for IcebergProcessFactory.recover() and Action.toString(),
      and a regression test covering the recoverProcesses() fail-safe path.
    
    Co-authored-by: lintingbin <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../amoro/server/process/ProcessService.java       | 73 +++++++++++++++++++---
 .../process/iceberg/IcebergProcessFactory.java     | 19 +++++-
 .../amoro/server/TestDefaultProcessService.java    | 47 ++++++++++++++
 .../process/ThrowingRecoverActionCoordinator.java  | 43 +++++++++++++
 .../process/iceberg/TestIcebergProcessFactory.java | 71 +++++++++++++++++++++
 .../src/main/java/org/apache/amoro/Action.java     |  5 ++
 .../src/test/java/org/apache/amoro/TestAction.java | 38 +++++++++++
 7 files changed, 285 insertions(+), 11 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index c3cac4efe..68446db8d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -156,20 +156,73 @@ public class ProcessService extends PersistentBase {
           ActionCoordinatorScheduler scheduler =
               actionCoordinators.get(processMeta.getProcessType());
           if (tableRuntime != null && scheduler != null) {
-            DefaultTableProcessStore store =
-                new DefaultTableProcessStore(
-                    processMeta.getProcessId(),
-                    tableRuntime,
-                    processMeta,
-                    scheduler.getAction(),
-                    processMeta.getRetryNumber());
-            TableProcess process = scheduler.recover(tableRuntime, store);
-            trackTableProcess(tableRuntime.getTableIdentifier(), store, 
process);
-            executeOrTraceProcess(store, process);
+            recoverProcess(tableRuntime, scheduler, processMeta);
           }
         });
   }
 
+  /**
+   * Recover a single persisted process record. Any failure is contained here: 
the offending record
+   * is marked {@link ProcessStatus#FAILED} and skipped, so that one 
un-recoverable process record
+   * cannot abort the whole AMS startup (see AMORO-4223). The affected 
maintenance action will be
+   * re-scheduled by its periodic scheduler.
+   *
+   * @param tableRuntime table runtime
+   * @param scheduler coordinator scheduler for the process type
+   * @param processMeta persisted process metadata
+   */
+  private void recoverProcess(
+      TableRuntime tableRuntime,
+      ActionCoordinatorScheduler scheduler,
+      TableProcessMeta processMeta) {
+    DefaultTableProcessStore store =
+        new DefaultTableProcessStore(
+            processMeta.getProcessId(),
+            tableRuntime,
+            processMeta,
+            scheduler.getAction(),
+            processMeta.getRetryNumber());
+    try {
+      TableProcess process = scheduler.recover(tableRuntime, store);
+      trackTableProcess(tableRuntime.getTableIdentifier(), store, process);
+      executeOrTraceProcess(store, process);
+    } catch (Throwable t) {
+      LOG.error(
+          "Failed to recover table process {} (action {}) for table {}, 
marking it FAILED "
+              + "and skipping so AMS can continue to start up.",
+          processMeta.getProcessId(),
+          scheduler.getAction(),
+          tableRuntime.getTableIdentifier(),
+          t);
+      markRecoverFailed(store, t);
+    }
+  }
+
+  /**
+   * Best-effort mark an un-recoverable process as {@link 
ProcessStatus#FAILED} so it is not picked
+   * up again on the next AMS restart. Never throws.
+   *
+   * @param store process store
+   * @param cause the recovery failure
+   */
+  private void markRecoverFailed(DefaultTableProcessStore store, Throwable 
cause) {
+    try {
+      store.tryTransitState(
+          ProcessStatus.FAILED,
+          ProcessEvent.COMPLETE_FAILED,
+          store.getExternalProcessIdentifier(),
+          "Failed to recover process on AMS startup: " + cause.getMessage(),
+          store.getProcessParameters(),
+          store.getSummary());
+    } catch (Throwable t) {
+      LOG.error(
+          "Failed to mark un-recoverable table process {} as FAILED; it may be 
retried on the "
+              + "next AMS restart.",
+          store.getProcessId(),
+          t);
+    }
+  }
+
   /**
    * Execute the process or trace it if not executable.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 813892595..db3ed8c74 100755
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -119,8 +119,25 @@ public class IcebergProcessFactory implements 
ProcessFactory {
   @Override
   public TableProcess recover(TableRuntime tableRuntime, TableProcessStore 
store)
       throws RecoverProcessFailedException {
+    Action action = store.getAction();
+    if (localEngine == null) {
+      throw new RecoverProcessFailedException(
+          "Local execution engine is not available for IcebergProcessFactory, "
+              + "cannot recover action: "
+              + action);
+    }
+
+    // SnapshotsExpiringProcess and OrphanFilesCleaningProcess are stateless, 
idempotent
+    // one-shot local maintenance tasks (no checkpoint), so recovery simply 
rebuilds the
+    // process so it can run again. The store/processId/tracking is owned by 
ProcessService.
+    if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
+      return new SnapshotsExpiringProcess(tableRuntime, localEngine);
+    } else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
+      return new OrphanFilesCleaningProcess(tableRuntime, localEngine);
+    }
+
     throw new RecoverProcessFailedException(
-        "Unsupported action for IcebergProcessFactory: " + store.getAction());
+        "Unsupported action for IcebergProcessFactory: " + action);
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
index 5b719aa73..c49bfe348 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
@@ -29,6 +29,7 @@ import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.process.MockActionCoordinator;
 import org.apache.amoro.server.process.MockExecuteEngine;
 import org.apache.amoro.server.process.ProcessService;
+import org.apache.amoro.server.process.ThrowingRecoverActionCoordinator;
 import org.apache.amoro.server.table.AMSTableTestBase;
 import org.junit.After;
 import org.junit.Assert;
@@ -218,6 +219,52 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
     }
   }
 
+  /**
+   * Verify that a single un-recoverable process record does not abort AMS 
startup: {@code
+   * recoverProcesses} must not propagate the failure, the bad record is 
skipped and persisted as
+   * FAILED so a later restart neither throws nor re-picks it. Regression test 
for AMORO-4223.
+   */
+  @Test(timeout = 60_000)
+  public void testRecoverProcessFailSafe() {
+    MockExecuteEngine executeEngine = getExecuteEngine();
+    try {
+      createTable();
+
+      awaitActiveInstances(executeEngine);
+
+      ProcessService.TableProcessHolder holder = 
getAnyActiveTableProcessHolder();
+      TableProcessStore store = holder.getStore();
+      org.apache.amoro.TableRuntime tableRuntime = 
holder.getProcess().getTableRuntime();
+
+      awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(), 
ProcessStatus.RUNNING);
+      Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
+
+      // Simulate an AMS restart where the process can no longer be recovered 
(the exact
+      // condition that bricked AMS in AMORO-4223): stop tracking the live 
instance, then
+      // swap in a coordinator whose recover() always fails.
+      processServiceService()
+          .untrackTableProcessInstance(tableRuntime.getTableIdentifier(), 
store.getProcessId());
+      processServiceService().unInstallAllActionCoordinators();
+      processServiceService()
+          .installActionCoordinator(new 
ThrowingRecoverActionCoordinator(executeEngine));
+
+      // Must NOT throw: the un-recoverable record is contained and AMS keeps 
starting up.
+      processServiceService()
+          .recoverProcesses(new 
ArrayList<>(Collections.singletonList(tableRuntime)));
+      
Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty());
+
+      // The bad record is now persisted as FAILED, so it is no longer 
"active": a subsequent
+      // restart neither throws nor re-picks it.
+      processServiceService()
+          .recoverProcesses(new 
ArrayList<>(Collections.singletonList(tableRuntime)));
+      
Assert.assertTrue(processServiceService().getActiveTableProcess().isEmpty());
+
+      dropTable();
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+
   // ---------------------- Private helpers ----------------------
 
   /** Return the first available execute engine and validate its presence. */
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
new file mode 100644
index 000000000..965bb3c2a
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/ThrowingRecoverActionCoordinator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process;
+
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+
+/**
+ * Mock coordinator whose {@code recoverTableProcess} always fails, used to 
verify that a single
+ * un-recoverable process record cannot abort the whole AMS startup (see 
AMORO-4223).
+ */
+public class ThrowingRecoverActionCoordinator extends MockActionCoordinator {
+
+  public ThrowingRecoverActionCoordinator(ExecuteEngine executeEngine) {
+    super(executeEngine);
+  }
+
+  @Override
+  public TableProcess recoverTableProcess(
+      TableRuntime tableRuntime, TableProcessStore processStore) {
+    throw new RecoverProcessFailedException(
+        "Unsupported action for test coordinator: " + 
processStore.getAction());
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 825410296..9c429803f 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -21,12 +21,16 @@ package org.apache.amoro.server.process.iceberg;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
+import org.apache.amoro.Action;
 import org.apache.amoro.IcebergActions;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.process.LocalExecutionEngine;
 import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
 import org.junit.Assert;
@@ -83,6 +87,73 @@ public class TestIcebergProcessFactory {
         "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, 
false, 0);
   }
 
+  @Test
+  public void testRecoverExpireSnapshotsProcess() {
+    assertRecover(
+        "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, 
SnapshotsExpiringProcess.class);
+  }
+
+  @Test
+  public void testRecoverOrphanFilesCleaningProcess() {
+    assertRecover(
+        "clean-orphan-files", IcebergActions.DELETE_ORPHANS, 
OrphanFilesCleaningProcess.class);
+  }
+
+  @Test
+  public void testRecoverUnsupportedActionThrows() {
+    IcebergProcessFactory factory = openedFactory("expire-snapshots");
+
+    LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+    doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+    factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+    TableProcessStore store = mock(TableProcessStore.class);
+    doReturn(IcebergActions.REWRITE).when(store).getAction();
+
+    Assert.assertThrows(
+        RecoverProcessFailedException.class,
+        () -> factory.recover(mock(TableRuntime.class), store));
+  }
+
+  @Test
+  public void testRecoverWithoutLocalEngineThrows() {
+    IcebergProcessFactory factory = openedFactory("expire-snapshots");
+
+    TableProcessStore store = mock(TableProcessStore.class);
+    doReturn(IcebergActions.EXPIRE_SNAPSHOTS).when(store).getAction();
+
+    Assert.assertThrows(
+        RecoverProcessFailedException.class,
+        () -> factory.recover(mock(TableRuntime.class), store));
+  }
+
+  private void assertRecover(String configKey, Action action, Class<?> 
processClass) {
+    IcebergProcessFactory factory = openedFactory(configKey);
+
+    LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+    doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+    factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+    TableProcessStore store = mock(TableProcessStore.class);
+    doReturn(action).when(store).getAction();
+
+    TableProcess process = factory.recover(mock(TableRuntime.class), store);
+
+    Assert.assertNotNull(process);
+    Assert.assertTrue(processClass.isInstance(process));
+    Assert.assertEquals(action, process.getAction());
+    Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, 
process.getExecutionEngine());
+  }
+
+  private IcebergProcessFactory openedFactory(String configKey) {
+    IcebergProcessFactory factory = new IcebergProcessFactory();
+    Map<String, String> properties = new HashMap<>();
+    properties.put(configKey + ".enabled", "true");
+    properties.put(configKey + ".interval", "1h");
+    factory.open(properties);
+    return factory;
+  }
+
   private void assertSupportedAction(
       String configKey, org.apache.amoro.Action action, Duration interval) {
     IcebergProcessFactory factory = new IcebergProcessFactory();
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java 
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 826006a31..11b53d0e2 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -69,4 +69,9 @@ public final class Action {
   public int hashCode() {
     return Objects.hash(name);
   }
+
+  @Override
+  public String toString() {
+    return name;
+  }
 }
diff --git a/amoro-common/src/test/java/org/apache/amoro/TestAction.java 
b/amoro-common/src/test/java/org/apache/amoro/TestAction.java
new file mode 100644
index 000000000..a7a4d739d
--- /dev/null
+++ b/amoro-common/src/test/java/org/apache/amoro/TestAction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+public class TestAction {
+
+  /**
+   * {@link Action#toString()} must return the action name so that diagnostic 
logs print a readable
+   * name instead of {@code org.apache.amoro.Action@hash} (see AMORO-4223).
+   */
+  @Test
+  public void testToStringReturnsName() {
+    Action action = Action.register("expire-snapshots");
+    assertEquals("EXPIRE-SNAPSHOTS", action.toString());
+    assertEquals(action.getName(), action.toString());
+    assertEquals("recover action: EXPIRE-SNAPSHOTS", "recover action: " + 
action);
+  }
+}

Reply via email to