This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 191414520 [AMORO-4089] Fix optimizing process stuck permanently after 
AMS restart (#4090)
191414520 is described below

commit 19141452088c891c85ad5fc88f2c2b0837b9bf9c
Author: Jiwon Park <[email protected]>
AuthorDate: Wed Mar 4 20:50:45 2026 +0900

    [AMORO-4089] Fix optimizing process stuck permanently after AMS restart 
(#4090)
    
    * [AMORO-4089] Fix optimizing process permanently stuck after AMS restart
    
    Fix multiple scenarios where tables become permanently stuck after
    AMS restart by refactoring initTableRuntime() recovery logic, fixing
    OptimizerKeeper race condition, and adding defensive measures.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    * [AMORO-4089] Skip resetting PENDING tables during recovery
    
    PENDING tables (processId=0) should not be reset via
    completeEmptyProcess() as it incorrectly marks existing snapshots
    as already optimized. Simply re-add them to the scheduler instead.
    
    Also use separate optimizer threads in quota tests to avoid
    interference with resetStaleTasksForThread.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    * [AMORO-4089] Avoid adding completed process to tableQueue during recovery
    
    When all tasks are already completed during recovery, skip adding the
    process to tableQueue and directly trigger beginCommitting() to prevent
    an empty shell process from lingering in the queue.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    * [AMORO-4089] Add Javadoc to completeEmptyProcess() documenting broader 
scope
    
    Document that completeEmptyProcess() resets the table from any non-IDLE
    state, not just PLANNING/PENDING, as it is now also used during startup
    recovery for tables with unrecoverable processes.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    * [AMORO-4089] Remove redundant KILLED status guard in recovery constructor
    
    canResumeProcess() already filters out KILLED processes, so the
    status != KILLED guard in the recovery constructor is unnecessary.
    Remove it to clean up legacy code as part of the refactoring.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    * [AMORO-4089] Lower recovery log level for SCHEDULED/ACKED tasks to DEBUG
    
    Transient recovery state logs can be noisy in clusters with many tables.
    
    Signed-off-by: Jiwon Park <[email protected]>
    
    ---------
    
    Signed-off-by: Jiwon Park <[email protected]>
    Co-authored-by: Xu Bai <[email protected]>
---
 .../amoro/server/DefaultOptimizingService.java     |   9 +-
 .../amoro/server/optimizing/OptimizingQueue.java   | 139 ++++++++++---
 .../amoro/server/table/DefaultTableRuntime.java    |  38 ++--
 .../amoro/server/TestDefaultOptimizingService.java | 220 ++++++++++++++++++++-
 .../server/optimizing/TestOptimizingQueue.java     |  19 +-
 5 files changed, 366 insertions(+), 59 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index f3b13e69a..9be63dbaf 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -490,16 +490,17 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
           OptimizerKeepingTask keepingTask = suspendingQueue.take();
           String token = keepingTask.getToken();
           boolean isExpired = !keepingTask.tryKeeping();
+          if (isExpired) {
+            LOG.info("Optimizer {} has been expired, unregister it", 
keepingTask.getOptimizer());
+            unregisterOptimizer(token);
+          }
           Optional.ofNullable(keepingTask.getQueue())
               .ifPresent(
                   queue ->
                       queue
                           
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
                           .forEach(task -> retryTask(task, queue)));
-          if (isExpired) {
-            LOG.info("Optimizer {} has been expired, unregister it", 
keepingTask.getOptimizer());
-            unregisterOptimizer(token);
-          } else {
+          if (!isExpired) {
             LOG.debug("Optimizer {} is being touched, keep it", 
keepingTask.getOptimizer());
             keepInTouch(keepingTask.getOptimizer());
           }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 964d27c7b..52eb46b4c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -129,42 +129,89 @@ public class OptimizingQueue extends PersistentBase {
   }
 
   private void initTableRuntime(DefaultTableRuntime tableRuntime) {
-    TableOptimizingProcess process = null;
-    if (tableRuntime.getOptimizingStatus().isProcessing() && 
tableRuntime.getProcessId() != 0) {
-      TableProcessMeta meta =
-          getAs(
-              TableProcessMapper.class,
-              mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
-      OptimizingProcessState state =
-          getAs(
-              OptimizingProcessMapper.class,
-              mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
-      process = new TableOptimizingProcess(tableRuntime, meta, state);
-      tableRuntime.recover(process);
-    }
-
-    if (tableRuntime.getOptimizingConfig().isEnabled()) {
+    try {
+      TableOptimizingProcess process = loadProcess(tableRuntime);
+
+      if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+        closeProcessIfRunning(process);
+        return;
+      }
+
       tableRuntime.resetTaskQuotas(
           System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
-      // Close the committing process to avoid duplicate commit on the table.
-      if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
-        if (process != null) {
-          LOG.warn(
-              "Close the committing process {} on table {}",
+
+      if (canResumeProcess(process, tableRuntime)) {
+        if (process.allTasksPrepared()) {
+          LOG.info(
+              "All tasks already completed for process {} on table {} during 
recovery,"
+                  + " triggering commit",
               process.getProcessId(),
               tableRuntime.getTableIdentifier());
-          process.close(false);
+          tableRuntime.beginCommitting();
+        } else {
+          tableQueue.offer(process);
         }
-      }
-      if (!tableRuntime.getOptimizingStatus().isProcessing()) {
+      } else {
+        resetTableForRecovery(process, tableRuntime);
         scheduler.addTable(tableRuntime);
-      } else if (process != null) {
-        tableQueue.offer(process);
-      }
-    } else {
-      if (process != null) {
-        process.close(false);
       }
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to initialize table runtime for table {}, skipping",
+          tableRuntime.getTableIdentifier(),
+          e);
+    }
+  }
+
+  private TableOptimizingProcess loadProcess(DefaultTableRuntime tableRuntime) 
{
+    if (tableRuntime.getProcessId() == 0) {
+      return null;
+    }
+    TableProcessMeta meta =
+        getAs(
+            TableProcessMapper.class, mapper -> 
mapper.getProcessMeta(tableRuntime.getProcessId()));
+    if (meta == null) {
+      return null;
+    }
+    OptimizingProcessState state =
+        getAs(
+            OptimizingProcessMapper.class,
+            mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
+    if (state == null) {
+      LOG.warn(
+          "No optimizing process state found for process {} on table {}, 
skipping process recovery",
+          tableRuntime.getProcessId(),
+          tableRuntime.getTableIdentifier());
+      return null;
+    }
+    return new TableOptimizingProcess(tableRuntime, meta, state);
+  }
+
+  private boolean canResumeProcess(
+      TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
+    return process != null
+        && process.getStatus() == ProcessStatus.RUNNING
+        && tableRuntime.getOptimizingStatus().isProcessing()
+        && tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING;
+  }
+
+  private void resetTableForRecovery(
+      TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
+    closeProcessIfRunning(process);
+    if (tableRuntime.getOptimizingStatus() != OptimizingStatus.IDLE
+        && tableRuntime.getOptimizingStatus() != OptimizingStatus.PENDING) {
+      LOG.warn(
+          "Resetting table {} from {} to IDLE during recovery (process: {})",
+          tableRuntime.getTableIdentifier(),
+          tableRuntime.getOptimizingStatus(),
+          process != null ? process.getStatus() : "null");
+      tableRuntime.completeEmptyProcess();
+    }
+  }
+
+  private void closeProcessIfRunning(TableOptimizingProcess process) {
+    if (process != null && process.getStatus() == ProcessStatus.RUNNING) {
+      process.close(false);
     }
   }
 
@@ -207,6 +254,7 @@ public class OptimizingQueue extends PersistentBase {
 
   public TaskRuntime<?> pollTask(
       OptimizerThread thread, long maxWaitTime, boolean breakQuotaLimit) {
+    resetStaleTasksForThread(thread);
     long deadline = calculateDeadline(maxWaitTime);
     TaskRuntime<?> task = fetchScheduledTask(thread, true);
     while (task == null && waitTask(deadline)) {
@@ -374,6 +422,26 @@ public class OptimizingQueue extends PersistentBase {
     
findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime<RewriteStageTask>) 
taskRuntime);
   }
 
+  private void resetStaleTasksForThread(OptimizerThread thread) {
+    // Only reset ACKED tasks: if the same (token, threadId) is polling again,
+    // the executor must have finished execution (poll → ack → execute → 
complete → poll).
+    // SCHEDULED tasks are NOT reset because the executor can still ack them 
normally,
+    // even after AMS restart.
+    collectTasks(
+            task ->
+                task.getStatus() == TaskRuntime.Status.ACKED
+                    && Objects.equals(task.getToken(), thread.getToken())
+                    && task.getThreadId() == thread.getThreadId())
+        .forEach(
+            task -> {
+              LOG.warn(
+                  "Resetting stale ACKED task {} because optimizer thread {} 
is polling new task",
+                  task.getTaskId(),
+                  thread);
+              retryTask(task);
+            });
+  }
+
   public ResourceGroup getOptimizerGroup() {
     return optimizerGroup;
   }
@@ -501,9 +569,7 @@ public class OptimizingQueue extends PersistentBase {
         toSequence = processState.getToSequence();
       }
       this.status = processMeta.getStatus();
-      if (this.status != ProcessStatus.KILLED) {
-        tableRuntime.recover(this);
-      }
+      tableRuntime.recover(this);
       loadTaskRuntimes(this);
     }
 
@@ -913,6 +979,15 @@ public class OptimizingQueue extends PersistentBase {
               taskMap.put(taskRuntime.getTaskId(), taskRuntime);
               if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) {
                 taskQueue.offer(taskRuntime);
+              } else if (taskRuntime.getStatus() == 
TaskRuntime.Status.SCHEDULED
+                  || taskRuntime.getStatus() == TaskRuntime.Status.ACKED) {
+                // Don't reset — let the optimizer finish execution and report
+                // the result. If the optimizer is dead, OptimizerKeeper will
+                // detect the stale token and reset the task via retryTask().
+                LOG.debug(
+                    "Keeping task {} in {} during recovery, waiting for 
optimizer to complete",
+                    taskRuntime.getTaskId(),
+                    taskRuntime.getStatus());
               } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) 
{
                 retryTask(taskRuntime);
               }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 648532c23..baaebd278 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -103,8 +103,7 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
   }
 
   public void recover(OptimizingProcess optimizingProcess) {
-    if (!getOptimizingStatus().isProcessing()
-        || !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
+    if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
       throw new IllegalStateException("Table runtime and processing are not 
matched!");
     }
     this.optimizingProcess = optimizingProcess;
@@ -404,24 +403,29 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
     optimizingProcess = null;
   }
 
+  /**
+   * Resets the table to IDLE from any non-IDLE state. This is used both when 
planning determines
+   * that optimization is unnecessary (from PLANNING state) and during startup 
recovery to reset
+   * tables with unrecoverable processes (from any processing state).
+   */
   public void completeEmptyProcess() {
     OptimizingStatus originalStatus = getOptimizingStatus();
-    boolean needUpdate =
-        originalStatus == OptimizingStatus.PLANNING || originalStatus == 
OptimizingStatus.PENDING;
-    if (needUpdate) {
-      store()
-          .begin()
-          .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
-          .updateState(
-              OPTIMIZING_STATE_KEY,
-              state -> {
-                state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
-                
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
-                return state;
-              })
-          .updateState(PENDING_INPUT_KEY, any -> new 
AbstractOptimizingEvaluator.PendingInput())
-          .commit();
+    if (originalStatus == OptimizingStatus.IDLE) {
+      return;
     }
+    store()
+        .begin()
+        .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
+        .updateState(
+            OPTIMIZING_STATE_KEY,
+            state -> {
+              state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
+              
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
+              return state;
+            })
+        .updateState(PENDING_INPUT_KEY, any -> new 
AbstractOptimizingEvaluator.PendingInput())
+        .commit();
+    optimizingProcess = null;
   }
 
   public void optimizingNotNecessary() {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index f9fa8ce94..2116631de 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -36,6 +36,11 @@ import org.apache.amoro.optimizing.TableOptimizing;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor;
 import org.apache.amoro.server.table.AMSTableTestBase;
@@ -45,6 +50,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.table.UnkeyedTable;
 import org.apache.amoro.utils.SerializationUtil;
+import org.apache.ibatis.session.SqlSession;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.data.Record;
@@ -248,7 +254,9 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     Assertions.assertThrows(PluginRetryAuthException.class, () -> 
optimizingService().touch(token));
     Assertions.assertThrows(
         PluginRetryAuthException.class, () -> 
optimizingService().pollTask(token, THREAD_ID));
-    assertTaskStatus(TaskRuntime.Status.SCHEDULED);
+    // After optimizer expires, its tasks are immediately reset to PLANNED
+    // because unregister happens before task scan in OptimizerKeeper
+    assertTaskStatus(TaskRuntime.Status.PLANNED);
     token = optimizingService().authenticate(buildRegisterInfo());
     toucher = new Toucher();
     Thread.sleep(1000);
@@ -333,8 +341,12 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
     Assertions.assertNotNull(task);
 
+    // After reload, SCHEDULED tasks are kept as-is (not reset to PLANNED).
+    // The optimizer is still alive, so it can complete the task directly.
     reload();
     assertTaskStatus(TaskRuntime.Status.SCHEDULED);
+
+    // Complete the task with the same token (optimizer still alive)
     optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
 
     TaskRuntime taskRuntime =
@@ -345,20 +357,51 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
 
   @Test
   public void testReloadAckTask() {
-    // 1.poll task
+    // 1.poll task and ack
     OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
     Assertions.assertNotNull(task);
     optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
 
+    // After reload, ACKED tasks are kept as-is (not reset to PLANNED).
+    // The optimizer is still alive, so it can complete the task directly.
     reload();
     assertTaskStatus(TaskRuntime.Status.ACKED);
 
+    // Complete the task with the same token (optimizer still alive)
     TaskRuntime<?> taskRuntime =
         optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
     optimizingService().completeTask(token, 
buildOptimizingTaskResult(task.getTaskId()));
     assertTaskCompleted(taskRuntime);
   }
 
+  @Test
+  public void testPollResetsStaleAckedTask() {
+    // 1. Poll and ack a task
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+    assertTaskStatus(TaskRuntime.Status.ACKED);
+
+    // 2. Reload (simulate AMS restart) — ACKED task is kept as-is
+    reload();
+    assertTaskStatus(TaskRuntime.Status.ACKED);
+
+    // 3. The same optimizer thread polls again — this means the executor 
finished
+    //    the old task but completeTask was lost during AMS downtime.
+    //    The stale ACKED task should be automatically reset to PLANNED,
+    //    then immediately re-polled by this same poll call.
+    OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task2);
+    Assertions.assertEquals(task.getTaskId(), task2.getTaskId());
+
+    // 4. Complete the re-polled task normally
+    optimizingService().ackTask(token, THREAD_ID, task2.getTaskId());
+    TaskRuntime<?> taskRuntime =
+        optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
+    optimizingService().completeTask(token, 
buildOptimizingTaskResult(task2.getTaskId()));
+    assertTaskCompleted(taskRuntime);
+  }
+
   @Test
   public void testReloadCompletedTask() {
     // THREAD_ID.poll task
@@ -376,6 +419,145 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
         
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
   }
 
+  @Test
+  public void testReloadAllTasksCompletedNotYetCommitting() {
+    // Simulate: AMS crashes after persisting the last task as SUCCESS
+    // but before beginCommitting() updates the table status to COMMITTING.
+    // DB state: process=RUNNING, all tasks=SUCCESS, table=*_OPTIMIZING
+
+    // 1. Complete all tasks normally — table transitions to COMMITTING
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+    optimizingService().completeTask(token, 
buildOptimizingTaskResult(task.getTaskId()));
+
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
+    Assertions.assertEquals(OptimizingStatus.COMMITTING, 
runtime.getOptimizingStatus());
+
+    // 2. Revert table status in DB to *_OPTIMIZING (simulate crash before 
beginCommitting)
+    long tableId = serverTableIdentifier().getId();
+    updateTableStatusInDb(tableId, OptimizingStatus.MINOR_OPTIMIZING);
+
+    // 3. Reload (simulate AMS restart)
+    reload();
+
+    // 4. During recovery, all tasks are SUCCESS so beginCommitting() should 
be triggered
+    Assertions.assertEquals(
+        OptimizingStatus.COMMITTING,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+  }
+
+  @Test
+  public void testReloadPlanningWithOrphanedProcess() {
+    // 1. Poll and ack a task - table is now in optimizing state with an 
active process
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+    assertTaskStatus(TaskRuntime.Status.ACKED);
+
+    // 2. Simulate table status being PLANNING while process is still active
+    // This can happen when AMS crashes during a planning transition
+    getDefaultTableRuntime(serverTableIdentifier().getId()).beginPlanning();
+    Assertions.assertEquals(
+        OptimizingStatus.PLANNING,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+
+    // 3. Reload (simulate AMS restart)
+    reload();
+
+    // 4. Orphaned process should be closed, table should transition to IDLE
+    Assertions.assertNull(
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+    Assertions.assertEquals(
+        OptimizingStatus.IDLE,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+  }
+
+  @Test
+  public void testReloadOptimizingWithFailedProcess() {
+    // Simulate: table is *_OPTIMIZING but process is FAILED in DB
+    // Before fix: table stuck in tableQueue (poll blocked for FAILED process)
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+    // Table should be in *_OPTIMIZING with a RUNNING process
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
+    Assertions.assertTrue(runtime.getOptimizingStatus().isProcessing());
+    Assertions.assertNotNull(runtime.getOptimizingProcess());
+
+    // Directly update process status to FAILED in DB to simulate crash after 
process failure
+    long processId = runtime.getProcessId();
+    long tableId = serverTableIdentifier().getId();
+    updateProcessStatusInDb(tableId, processId, ProcessStatus.FAILED);
+
+    // Reload (simulate AMS restart)
+    reload();
+
+    // Table should be reset to IDLE and added to scheduler
+    Assertions.assertNull(
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+    Assertions.assertEquals(
+        OptimizingStatus.IDLE,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+  }
+
+  @Test
+  public void testReloadCommittingWithFailedProcess() {
+    // Simulate: table is COMMITTING but process is FAILED in DB
+    // Before fix: table became a ghost (not in scheduler or tableQueue)
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+    optimizingService().completeTask(token, 
buildOptimizingTaskResult(task.getTaskId()));
+
+    // Table should be in COMMITTING state
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
+    Assertions.assertEquals(OptimizingStatus.COMMITTING, 
runtime.getOptimizingStatus());
+
+    // Directly update process status to FAILED in DB
+    long processId = runtime.getProcessId();
+    long tableId = serverTableIdentifier().getId();
+    updateProcessStatusInDb(tableId, processId, ProcessStatus.FAILED);
+
+    // Reload (simulate AMS restart)
+    reload();
+
+    // Table should be reset to IDLE
+    Assertions.assertNull(
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+    Assertions.assertEquals(
+        OptimizingStatus.IDLE,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+  }
+
+  @Test
+  public void testReloadOptimizingWithNoProcessRecord() {
+    // Simulate: table is *_OPTIMIZING but process record is missing from DB
+    // Before fix: table became a ghost (not in scheduler or tableQueue)
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+    DefaultTableRuntime runtime = 
getDefaultTableRuntime(serverTableIdentifier().getId());
+    Assertions.assertTrue(runtime.getOptimizingStatus().isProcessing());
+
+    // Delete process record from DB to simulate missing process
+    long processId = runtime.getProcessId();
+    long tableId = serverTableIdentifier().getId();
+    deleteProcessFromDb(tableId, processId);
+
+    // Reload (simulate AMS restart)
+    reload();
+
+    // Table should be reset to IDLE
+    Assertions.assertNull(
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+    Assertions.assertEquals(
+        OptimizingStatus.IDLE,
+        
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+  }
+
   @Test
   public void testReloadFailedTask() {
     // 1.poll task
@@ -442,6 +624,40 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     return optimizingTaskResult;
   }
 
+  private void updateProcessStatusInDb(long tableId, long processId, 
ProcessStatus status) {
+    try (SqlSession session = 
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+      TableProcessMapper mapper = session.getMapper(TableProcessMapper.class);
+      TableProcessMeta meta = mapper.getProcessMeta(processId);
+      mapper.updateProcess(
+          tableId,
+          processId,
+          meta.getExternalProcessIdentifier(),
+          status,
+          meta.getProcessStage(),
+          meta.getRetryNumber(),
+          System.currentTimeMillis(),
+          "simulated failure",
+          meta.getProcessParameters(),
+          meta.getSummary());
+    }
+  }
+
+  private void updateTableStatusInDb(long tableId, OptimizingStatus status) {
+    try (SqlSession session = 
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+      TableRuntimeMapper mapper = session.getMapper(TableRuntimeMapper.class);
+      TableRuntimeMeta meta = mapper.selectRuntime(tableId);
+      meta.setStatusCode(status.getCode());
+      mapper.updateRuntime(meta);
+    }
+  }
+
+  private void deleteProcessFromDb(long tableId, long processId) {
+    try (SqlSession session = 
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+      TableProcessMapper mapper = session.getMapper(TableProcessMapper.class);
+      mapper.deleteBefore(tableId, processId);
+    }
+  }
+
   private void assertTaskStatus(TaskRuntime.Status expectedStatus) {
     Assertions.assertEquals(
         expectedStatus,
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 342b533aa..ddf8fe985 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -91,6 +91,15 @@ public class TestOptimizingQueue extends AMSTableTestBase {
         }
       };
 
+  private final OptimizerThread optimizerThread2 =
+      new OptimizerThread(2, null) {
+
+        @Override
+        public String getToken() {
+          return "aah";
+        }
+      };
+
   public TestOptimizingQueue(CatalogTestHelper catalogTestHelper, 
TableTestHelper tableTestHelper) {
     super(catalogTestHelper, tableTestHelper, true);
   }
@@ -188,7 +197,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
 
-    TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
false);
+    // Use a different thread to avoid resetStaleTasksForThread resetting the 
ACKED task
+    TaskRuntime<?> task2 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME, 
false);
     Assert.assertNull(task2);
 
     queue.completeTask(
@@ -222,16 +232,17 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
 
-    TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
true);
+    // Use a different thread to avoid resetStaleTasksForThread resetting the 
ACKED task
+    TaskRuntime<?> task2 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME, 
true);
     Assert.assertNotNull(task2);
 
     queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
-    TaskRuntime<?> task4 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
false);
+    TaskRuntime<?> task4 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME, 
false);
     Assert.assertNull(task4);
-    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME, true);
+    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread2, 
MAX_POLLING_TIME, true);
     Assert.assertNotNull(retryTask);
     queue.dispose();
   }

Reply via email to