This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 191414520 [AMORO-4089] Fix optimizing process stuck permanently after
AMS restart (#4090)
191414520 is described below
commit 19141452088c891c85ad5fc88f2c2b0837b9bf9c
Author: Jiwon Park <[email protected]>
AuthorDate: Wed Mar 4 20:50:45 2026 +0900
[AMORO-4089] Fix optimizing process stuck permanently after AMS restart
(#4090)
* [AMORO-4089] Fix optimizing process permanently stuck after AMS restart
Fix multiple scenarios where tables become permanently stuck after
AMS restart by refactoring initTableRuntime() recovery logic, fixing
OptimizerKeeper race condition, and adding defensive measures.
Signed-off-by: Jiwon Park <[email protected]>
* [AMORO-4089] Skip resetting PENDING tables during recovery
PENDING tables (processId=0) should not be reset via
completeEmptyProcess() as it incorrectly marks existing snapshots
as already optimized. Simply re-add them to the scheduler instead.
Also use separate optimizer threads in quota tests to avoid
interference with resetStaleTasksForThread.
Signed-off-by: Jiwon Park <[email protected]>
* [AMORO-4089] Avoid adding completed process to tableQueue during recovery
When all tasks are already completed during recovery, skip adding the
process to tableQueue and directly trigger beginCommitting() to prevent
an empty shell process from lingering in the queue.
Signed-off-by: Jiwon Park <[email protected]>
* [AMORO-4089] Add Javadoc to completeEmptyProcess() documenting broader
scope
Document that completeEmptyProcess() resets the table from any non-IDLE
state, not just PLANNING/PENDING, as it is now also used during startup
recovery for tables with unrecoverable processes.
Signed-off-by: Jiwon Park <[email protected]>
* [AMORO-4089] Remove redundant KILLED status guard in recovery constructor
canResumeProcess() already filters out KILLED processes, so the
status != KILLED guard in the recovery constructor is unnecessary.
Remove it to clean up legacy code as part of the refactoring.
Signed-off-by: Jiwon Park <[email protected]>
* [AMORO-4089] Lower recovery log level for SCHEDULED/ACKED tasks to DEBUG
Transient recovery state logs can be noisy in clusters with many tables.
Signed-off-by: Jiwon Park <[email protected]>
---------
Signed-off-by: Jiwon Park <[email protected]>
Co-authored-by: Xu Bai <[email protected]>
---
.../amoro/server/DefaultOptimizingService.java | 9 +-
.../amoro/server/optimizing/OptimizingQueue.java | 139 ++++++++++---
.../amoro/server/table/DefaultTableRuntime.java | 38 ++--
.../amoro/server/TestDefaultOptimizingService.java | 220 ++++++++++++++++++++-
.../server/optimizing/TestOptimizingQueue.java | 19 +-
5 files changed, 366 insertions(+), 59 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index f3b13e69a..9be63dbaf 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -490,16 +490,17 @@ public class DefaultOptimizingService extends
StatedPersistentBase
OptimizerKeepingTask keepingTask = suspendingQueue.take();
String token = keepingTask.getToken();
boolean isExpired = !keepingTask.tryKeeping();
+ if (isExpired) {
+ LOG.info("Optimizer {} has been expired, unregister it",
keepingTask.getOptimizer());
+ unregisterOptimizer(token);
+ }
Optional.ofNullable(keepingTask.getQueue())
.ifPresent(
queue ->
queue
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
.forEach(task -> retryTask(task, queue)));
- if (isExpired) {
- LOG.info("Optimizer {} has been expired, unregister it",
keepingTask.getOptimizer());
- unregisterOptimizer(token);
- } else {
+ if (!isExpired) {
LOG.debug("Optimizer {} is being touched, keep it",
keepingTask.getOptimizer());
keepInTouch(keepingTask.getOptimizer());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 964d27c7b..52eb46b4c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -129,42 +129,89 @@ public class OptimizingQueue extends PersistentBase {
}
private void initTableRuntime(DefaultTableRuntime tableRuntime) {
- TableOptimizingProcess process = null;
- if (tableRuntime.getOptimizingStatus().isProcessing() &&
tableRuntime.getProcessId() != 0) {
- TableProcessMeta meta =
- getAs(
- TableProcessMapper.class,
- mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
- OptimizingProcessState state =
- getAs(
- OptimizingProcessMapper.class,
- mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
- process = new TableOptimizingProcess(tableRuntime, meta, state);
- tableRuntime.recover(process);
- }
-
- if (tableRuntime.getOptimizingConfig().isEnabled()) {
+ try {
+ TableOptimizingProcess process = loadProcess(tableRuntime);
+
+ if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+ closeProcessIfRunning(process);
+ return;
+ }
+
tableRuntime.resetTaskQuotas(
System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
- // Close the committing process to avoid duplicate commit on the table.
- if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
- if (process != null) {
- LOG.warn(
- "Close the committing process {} on table {}",
+
+ if (canResumeProcess(process, tableRuntime)) {
+ if (process.allTasksPrepared()) {
+ LOG.info(
+ "All tasks already completed for process {} on table {} during
recovery,"
+ + " triggering commit",
process.getProcessId(),
tableRuntime.getTableIdentifier());
- process.close(false);
+ tableRuntime.beginCommitting();
+ } else {
+ tableQueue.offer(process);
}
- }
- if (!tableRuntime.getOptimizingStatus().isProcessing()) {
+ } else {
+ resetTableForRecovery(process, tableRuntime);
scheduler.addTable(tableRuntime);
- } else if (process != null) {
- tableQueue.offer(process);
- }
- } else {
- if (process != null) {
- process.close(false);
}
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to initialize table runtime for table {}, skipping",
+ tableRuntime.getTableIdentifier(),
+ e);
+ }
+ }
+
+ private TableOptimizingProcess loadProcess(DefaultTableRuntime tableRuntime)
{
+ if (tableRuntime.getProcessId() == 0) {
+ return null;
+ }
+ TableProcessMeta meta =
+ getAs(
+ TableProcessMapper.class, mapper ->
mapper.getProcessMeta(tableRuntime.getProcessId()));
+ if (meta == null) {
+ return null;
+ }
+ OptimizingProcessState state =
+ getAs(
+ OptimizingProcessMapper.class,
+ mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
+ if (state == null) {
+ LOG.warn(
+ "No optimizing process state found for process {} on table {},
skipping process recovery",
+ tableRuntime.getProcessId(),
+ tableRuntime.getTableIdentifier());
+ return null;
+ }
+ return new TableOptimizingProcess(tableRuntime, meta, state);
+ }
+
+ private boolean canResumeProcess(
+ TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
+ return process != null
+ && process.getStatus() == ProcessStatus.RUNNING
+ && tableRuntime.getOptimizingStatus().isProcessing()
+ && tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING;
+ }
+
+ private void resetTableForRecovery(
+ TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
+ closeProcessIfRunning(process);
+ if (tableRuntime.getOptimizingStatus() != OptimizingStatus.IDLE
+ && tableRuntime.getOptimizingStatus() != OptimizingStatus.PENDING) {
+ LOG.warn(
+ "Resetting table {} from {} to IDLE during recovery (process: {})",
+ tableRuntime.getTableIdentifier(),
+ tableRuntime.getOptimizingStatus(),
+ process != null ? process.getStatus() : "null");
+ tableRuntime.completeEmptyProcess();
+ }
+ }
+
+ private void closeProcessIfRunning(TableOptimizingProcess process) {
+ if (process != null && process.getStatus() == ProcessStatus.RUNNING) {
+ process.close(false);
}
}
@@ -207,6 +254,7 @@ public class OptimizingQueue extends PersistentBase {
public TaskRuntime<?> pollTask(
OptimizerThread thread, long maxWaitTime, boolean breakQuotaLimit) {
+ resetStaleTasksForThread(thread);
long deadline = calculateDeadline(maxWaitTime);
TaskRuntime<?> task = fetchScheduledTask(thread, true);
while (task == null && waitTask(deadline)) {
@@ -374,6 +422,26 @@ public class OptimizingQueue extends PersistentBase {
findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime<RewriteStageTask>)
taskRuntime);
}
+ private void resetStaleTasksForThread(OptimizerThread thread) {
+ // Only reset ACKED tasks: if the same (token, threadId) is polling again,
+ // the executor must have finished execution (poll → ack → execute →
complete → poll).
+ // SCHEDULED tasks are NOT reset because the executor can still ack them
normally,
+ // even after AMS restart.
+ collectTasks(
+ task ->
+ task.getStatus() == TaskRuntime.Status.ACKED
+ && Objects.equals(task.getToken(), thread.getToken())
+ && task.getThreadId() == thread.getThreadId())
+ .forEach(
+ task -> {
+ LOG.warn(
+ "Resetting stale ACKED task {} because optimizer thread {}
is polling new task",
+ task.getTaskId(),
+ thread);
+ retryTask(task);
+ });
+ }
+
public ResourceGroup getOptimizerGroup() {
return optimizerGroup;
}
@@ -501,9 +569,7 @@ public class OptimizingQueue extends PersistentBase {
toSequence = processState.getToSequence();
}
this.status = processMeta.getStatus();
- if (this.status != ProcessStatus.KILLED) {
- tableRuntime.recover(this);
- }
+ tableRuntime.recover(this);
loadTaskRuntimes(this);
}
@@ -913,6 +979,15 @@ public class OptimizingQueue extends PersistentBase {
taskMap.put(taskRuntime.getTaskId(), taskRuntime);
if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) {
taskQueue.offer(taskRuntime);
+ } else if (taskRuntime.getStatus() ==
TaskRuntime.Status.SCHEDULED
+ || taskRuntime.getStatus() == TaskRuntime.Status.ACKED) {
+ // Don't reset — let the optimizer finish execution and report
+ // the result. If the optimizer is dead, OptimizerKeeper will
+ // detect the stale token and reset the task via retryTask().
+ LOG.debug(
+ "Keeping task {} in {} during recovery, waiting for
optimizer to complete",
+ taskRuntime.getTaskId(),
+ taskRuntime.getStatus());
} else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED)
{
retryTask(taskRuntime);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 648532c23..baaebd278 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -103,8 +103,7 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
}
public void recover(OptimizingProcess optimizingProcess) {
- if (!getOptimizingStatus().isProcessing()
- || !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
+ if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
throw new IllegalStateException("Table runtime and processing are not
matched!");
}
this.optimizingProcess = optimizingProcess;
@@ -404,24 +403,29 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
optimizingProcess = null;
}
+ /**
+ * Resets the table to IDLE from any non-IDLE state. This is used both when
planning determines
+ * that optimization is unnecessary (from PLANNING state) and during startup
recovery to reset
+ * tables with unrecoverable processes (from any processing state).
+ */
public void completeEmptyProcess() {
OptimizingStatus originalStatus = getOptimizingStatus();
- boolean needUpdate =
- originalStatus == OptimizingStatus.PLANNING || originalStatus ==
OptimizingStatus.PENDING;
- if (needUpdate) {
- store()
- .begin()
- .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
- .updateState(
- OPTIMIZING_STATE_KEY,
- state -> {
- state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
-
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
- return state;
- })
- .updateState(PENDING_INPUT_KEY, any -> new
AbstractOptimizingEvaluator.PendingInput())
- .commit();
+ if (originalStatus == OptimizingStatus.IDLE) {
+ return;
}
+ store()
+ .begin()
+ .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
+ .updateState(
+ OPTIMIZING_STATE_KEY,
+ state -> {
+ state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
+
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
+ return state;
+ })
+ .updateState(PENDING_INPUT_KEY, any -> new
AbstractOptimizingEvaluator.PendingInput())
+ .commit();
+ optimizingProcess = null;
}
public void optimizingNotNecessary() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index f9fa8ce94..2116631de 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -36,6 +36,11 @@ import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor;
import org.apache.amoro.server.table.AMSTableTestBase;
@@ -45,6 +50,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.SerializationUtil;
+import org.apache.ibatis.session.SqlSession;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.data.Record;
@@ -248,7 +254,9 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
Assertions.assertThrows(PluginRetryAuthException.class, () ->
optimizingService().touch(token));
Assertions.assertThrows(
PluginRetryAuthException.class, () ->
optimizingService().pollTask(token, THREAD_ID));
- assertTaskStatus(TaskRuntime.Status.SCHEDULED);
+ // After optimizer expires, its tasks are immediately reset to PLANNED
+ // because unregister happens before task scan in OptimizerKeeper
+ assertTaskStatus(TaskRuntime.Status.PLANNED);
token = optimizingService().authenticate(buildRegisterInfo());
toucher = new Toucher();
Thread.sleep(1000);
@@ -333,8 +341,12 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
+ // After reload, SCHEDULED tasks are kept as-is (not reset to PLANNED).
+ // The optimizer is still alive, so it can complete the task directly.
reload();
assertTaskStatus(TaskRuntime.Status.SCHEDULED);
+
+ // Complete the task with the same token (optimizer still alive)
optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
TaskRuntime taskRuntime =
@@ -345,20 +357,51 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
@Test
public void testReloadAckTask() {
- // 1.poll task
+ // 1.poll task and ack
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ // After reload, ACKED tasks are kept as-is (not reset to PLANNED).
+ // The optimizer is still alive, so it can complete the task directly.
reload();
assertTaskStatus(TaskRuntime.Status.ACKED);
+ // Complete the task with the same token (optimizer still alive)
TaskRuntime<?> taskRuntime =
optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
optimizingService().completeTask(token,
buildOptimizingTaskResult(task.getTaskId()));
assertTaskCompleted(taskRuntime);
}
+ @Test
+ public void testPollResetsStaleAckedTask() {
+ // 1. Poll and ack a task
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ assertTaskStatus(TaskRuntime.Status.ACKED);
+
+ // 2. Reload (simulate AMS restart) — ACKED task is kept as-is
+ reload();
+ assertTaskStatus(TaskRuntime.Status.ACKED);
+
+ // 3. The same optimizer thread polls again — this means the executor
finished
+ // the old task but completeTask was lost during AMS downtime.
+ // The stale ACKED task should be automatically reset to PLANNED,
+ // then immediately re-polled by this same poll call.
+ OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task2);
+ Assertions.assertEquals(task.getTaskId(), task2.getTaskId());
+
+ // 4. Complete the re-polled task normally
+ optimizingService().ackTask(token, THREAD_ID, task2.getTaskId());
+ TaskRuntime<?> taskRuntime =
+ optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
+ optimizingService().completeTask(token,
buildOptimizingTaskResult(task2.getTaskId()));
+ assertTaskCompleted(taskRuntime);
+ }
+
@Test
public void testReloadCompletedTask() {
// THREAD_ID.poll task
@@ -376,6 +419,145 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
}
+ @Test
+ public void testReloadAllTasksCompletedNotYetCommitting() {
+ // Simulate: AMS crashes after persisting the last task as SUCCESS
+ // but before beginCommitting() updates the table status to COMMITTING.
+ // DB state: process=RUNNING, all tasks=SUCCESS, table=*_OPTIMIZING
+
+ // 1. Complete all tasks normally — table transitions to COMMITTING
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ optimizingService().completeTask(token,
buildOptimizingTaskResult(task.getTaskId()));
+
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
+ Assertions.assertEquals(OptimizingStatus.COMMITTING,
runtime.getOptimizingStatus());
+
+ // 2. Revert table status in DB to *_OPTIMIZING (simulate crash before
beginCommitting)
+ long tableId = serverTableIdentifier().getId();
+ updateTableStatusInDb(tableId, OptimizingStatus.MINOR_OPTIMIZING);
+
+ // 3. Reload (simulate AMS restart)
+ reload();
+
+ // 4. During recovery, all tasks are SUCCESS so beginCommitting() should
be triggered
+ Assertions.assertEquals(
+ OptimizingStatus.COMMITTING,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+ }
+
+ @Test
+ public void testReloadPlanningWithOrphanedProcess() {
+ // 1. Poll and ack a task - table is now in optimizing state with an
active process
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ assertTaskStatus(TaskRuntime.Status.ACKED);
+
+ // 2. Simulate table status being PLANNING while process is still active
+ // This can happen when AMS crashes during a planning transition
+ getDefaultTableRuntime(serverTableIdentifier().getId()).beginPlanning();
+ Assertions.assertEquals(
+ OptimizingStatus.PLANNING,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+
+ // 3. Reload (simulate AMS restart)
+ reload();
+
+ // 4. Orphaned process should be closed, table should transition to IDLE
+ Assertions.assertNull(
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+ Assertions.assertEquals(
+ OptimizingStatus.IDLE,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+ }
+
+ @Test
+ public void testReloadOptimizingWithFailedProcess() {
+ // Simulate: table is *_OPTIMIZING but process is FAILED in DB
+ // Before fix: table stuck in tableQueue (poll blocked for FAILED process)
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+ // Table should be in *_OPTIMIZING with a RUNNING process
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
+ Assertions.assertTrue(runtime.getOptimizingStatus().isProcessing());
+ Assertions.assertNotNull(runtime.getOptimizingProcess());
+
+ // Directly update process status to FAILED in DB to simulate crash after
process failure
+ long processId = runtime.getProcessId();
+ long tableId = serverTableIdentifier().getId();
+ updateProcessStatusInDb(tableId, processId, ProcessStatus.FAILED);
+
+ // Reload (simulate AMS restart)
+ reload();
+
+ // Table should be reset to IDLE and added to scheduler
+ Assertions.assertNull(
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+ Assertions.assertEquals(
+ OptimizingStatus.IDLE,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+ }
+
+ @Test
+ public void testReloadCommittingWithFailedProcess() {
+ // Simulate: table is COMMITTING but process is FAILED in DB
+ // Before fix: table became a ghost (not in scheduler or tableQueue)
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ optimizingService().completeTask(token,
buildOptimizingTaskResult(task.getTaskId()));
+
+ // Table should be in COMMITTING state
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
+ Assertions.assertEquals(OptimizingStatus.COMMITTING,
runtime.getOptimizingStatus());
+
+ // Directly update process status to FAILED in DB
+ long processId = runtime.getProcessId();
+ long tableId = serverTableIdentifier().getId();
+ updateProcessStatusInDb(tableId, processId, ProcessStatus.FAILED);
+
+ // Reload (simulate AMS restart)
+ reload();
+
+ // Table should be reset to IDLE
+ Assertions.assertNull(
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+ Assertions.assertEquals(
+ OptimizingStatus.IDLE,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+ }
+
+ @Test
+ public void testReloadOptimizingWithNoProcessRecord() {
+ // Simulate: table is *_OPTIMIZING but process record is missing from DB
+ // Before fix: table became a ghost (not in scheduler or tableQueue)
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
+ Assertions.assertTrue(runtime.getOptimizingStatus().isProcessing());
+
+ // Delete process record from DB to simulate missing process
+ long processId = runtime.getProcessId();
+ long tableId = serverTableIdentifier().getId();
+ deleteProcessFromDb(tableId, processId);
+
+ // Reload (simulate AMS restart)
+ reload();
+
+ // Table should be reset to IDLE
+ Assertions.assertNull(
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingProcess());
+ Assertions.assertEquals(
+ OptimizingStatus.IDLE,
+
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
+ }
+
@Test
public void testReloadFailedTask() {
// 1.poll task
@@ -442,6 +624,40 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
return optimizingTaskResult;
}
+ private void updateProcessStatusInDb(long tableId, long processId,
ProcessStatus status) {
+ try (SqlSession session =
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+ TableProcessMapper mapper = session.getMapper(TableProcessMapper.class);
+ TableProcessMeta meta = mapper.getProcessMeta(processId);
+ mapper.updateProcess(
+ tableId,
+ processId,
+ meta.getExternalProcessIdentifier(),
+ status,
+ meta.getProcessStage(),
+ meta.getRetryNumber(),
+ System.currentTimeMillis(),
+ "simulated failure",
+ meta.getProcessParameters(),
+ meta.getSummary());
+ }
+ }
+
+ private void updateTableStatusInDb(long tableId, OptimizingStatus status) {
+ try (SqlSession session =
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+ TableRuntimeMapper mapper = session.getMapper(TableRuntimeMapper.class);
+ TableRuntimeMeta meta = mapper.selectRuntime(tableId);
+ meta.setStatusCode(status.getCode());
+ mapper.updateRuntime(meta);
+ }
+ }
+
+ private void deleteProcessFromDb(long tableId, long processId) {
+ try (SqlSession session =
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+ TableProcessMapper mapper = session.getMapper(TableProcessMapper.class);
+ mapper.deleteBefore(tableId, processId);
+ }
+ }
+
private void assertTaskStatus(TaskRuntime.Status expectedStatus) {
Assertions.assertEquals(
expectedStatus,
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 342b533aa..ddf8fe985 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -91,6 +91,15 @@ public class TestOptimizingQueue extends AMSTableTestBase {
}
};
+ private final OptimizerThread optimizerThread2 =
+ new OptimizerThread(2, null) {
+
+ @Override
+ public String getToken() {
+ return "aah";
+ }
+ };
+
public TestOptimizingQueue(CatalogTestHelper catalogTestHelper,
TableTestHelper tableTestHelper) {
super(catalogTestHelper, tableTestHelper, true);
}
@@ -188,7 +197,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
false);
+ // Use a different thread to avoid resetStaleTasksForThread resetting the
ACKED task
+ TaskRuntime<?> task2 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME,
false);
Assert.assertNull(task2);
queue.completeTask(
@@ -222,16 +232,17 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
true);
+ // Use a different thread to avoid resetStaleTasksForThread resetting the
ACKED task
+ TaskRuntime<?> task2 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME,
true);
Assert.assertNotNull(task2);
queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
- TaskRuntime<?> task4 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
false);
+ TaskRuntime<?> task4 = queue.pollTask(optimizerThread2, MAX_POLLING_TIME,
false);
Assert.assertNull(task4);
- TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME, true);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread2,
MAX_POLLING_TIME, true);
Assert.assertNotNull(retryTask);
queue.dispose();
}