lintingbin commented on code in PR #4090:
URL: https://github.com/apache/amoro/pull/4090#discussion_r2852487023


##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -129,42 +129,88 @@ public OptimizingQueue(
   }
 
   private void initTableRuntime(DefaultTableRuntime tableRuntime) {
-    TableOptimizingProcess process = null;
-    if (tableRuntime.getOptimizingStatus().isProcessing() && 
tableRuntime.getProcessId() != 0) {
-      TableProcessMeta meta =
-          getAs(
-              TableProcessMapper.class,
-              mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
-      OptimizingProcessState state =
-          getAs(
-              OptimizingProcessMapper.class,
-              mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
-      process = new TableOptimizingProcess(tableRuntime, meta, state);
-      tableRuntime.recover(process);
-    }
-
-    if (tableRuntime.getOptimizingConfig().isEnabled()) {
+    try {
+      TableOptimizingProcess process = loadProcess(tableRuntime);
+
+      if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+        closeProcessIfRunning(process);
+        return;
+      }
+
       tableRuntime.resetTaskQuotas(
           System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
-      // Close the committing process to avoid duplicate commit on the table.
-      if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
-        if (process != null) {
-          LOG.warn(
-              "Close the committing process {} on table {}",
+
+      if (canResumeProcess(process, tableRuntime)) {
+        tableQueue.offer(process);
+        if (process.allTasksPrepared()) {
+          LOG.info(
+              "All tasks already completed for process {} on table {} during 
recovery,"
+                  + " triggering commit",
               process.getProcessId(),
               tableRuntime.getTableIdentifier());
-          process.close(false);
+          tableRuntime.beginCommitting();

Review Comment:
   **Nit: Process left in `tableQueue` with no remaining tasks**
   
   When `allTasksPrepared()` is true and `beginCommitting()` is called, the 
process has already been added to `tableQueue` (line above) but its `taskQueue` 
is empty — all tasks are `SUCCESS`. This "empty shell" process will sit in 
`tableQueue` indefinitely:
   - `pollTask()` will iterate over it on every poll but never get a task from 
it
   - It won't be cleaned up until `clearProcess()` is called after a commit
   
   Consider either not adding it to `tableQueue` when all tasks are already 
completed, or removing it after triggering `beginCommitting()`:
   
   ```java
   if (process.allTasksPrepared()) {
       tableQueue.remove(process);
       // ...
       tableRuntime.beginCommitting();
   }
   ```



##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java:
##########
@@ -434,22 +433,22 @@ public void completeProcess(boolean success) {
 
   public void completeEmptyProcess() {
     OptimizingStatus originalStatus = getOptimizingStatus();
-    boolean needUpdate =
-        originalStatus == OptimizingStatus.PLANNING || originalStatus == 
OptimizingStatus.PENDING;
-    if (needUpdate) {
-      store()
-          .begin()
-          .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
-          .updateState(
-              OPTIMIZING_STATE_KEY,
-              state -> {
-                state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
-                
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
-                return state;
-              })
-          .updateState(PENDING_INPUT_KEY, any -> new 
AbstractOptimizingEvaluator.PendingInput())
-          .commit();
+    if (originalStatus == OptimizingStatus.IDLE) {
+      return;

Review Comment:
   **Caution: Expanded scope of `completeEmptyProcess()` may have unintended 
side effects**
   
   The original implementation only allowed `PLANNING` and `PENDING` states to 
be reset to `IDLE`. The new version resets **any** non-IDLE state (including 
`*_OPTIMIZING` and `COMMITTING`).
   
   While this is needed by `resetTableForRecovery()` (which guards with its own 
IDLE/PENDING check), `completeEmptyProcess()` is a `public` method. Any 
existing or future caller that invokes it on a `*_OPTIMIZING` table will now 
silently reset it to IDLE and mark the current snapshot as "optimized" — this 
was previously a no-op.
   
   Consider either:
   1. Adding a comment documenting the new broader contract, or
   2. Making `resetTableForRecovery` use a dedicated internal method, keeping 
`completeEmptyProcess()` scoped to its original states to avoid accidentally 
resetting actively-optimizing tables from other call sites.



##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java:
##########
@@ -114,8 +114,7 @@ public DefaultTableRuntime(TableRuntimeStore store) {
   }
 
   public void recover(OptimizingProcess optimizingProcess) {
-    if (!getOptimizingStatus().isProcessing()
-        || !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
+    if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {

Review Comment:
   **Note: This change is currently dead code**
   
   Since `recover()` is no longer called anywhere in the new 
`initTableRuntime()` logic, this modification has no effect in the current PR.
   
   However, if the missing `recover()` call is added back (see my comment on 
`OptimizingQueue.java`), then this change becomes necessary — because the 
resume path may call `recover()` on a table whose `isProcessing()` status is 
true (e.g., `*_OPTIMIZING`), so relaxing the guard condition here makes sense 
in that context.



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -129,42 +129,88 @@ public OptimizingQueue(
   }
 
   private void initTableRuntime(DefaultTableRuntime tableRuntime) {
-    TableOptimizingProcess process = null;
-    if (tableRuntime.getOptimizingStatus().isProcessing() && 
tableRuntime.getProcessId() != 0) {
-      TableProcessMeta meta =
-          getAs(
-              TableProcessMapper.class,
-              mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
-      OptimizingProcessState state =
-          getAs(
-              OptimizingProcessMapper.class,
-              mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
-      process = new TableOptimizingProcess(tableRuntime, meta, state);
-      tableRuntime.recover(process);
-    }
-
-    if (tableRuntime.getOptimizingConfig().isEnabled()) {
+    try {
+      TableOptimizingProcess process = loadProcess(tableRuntime);
+
+      if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+        closeProcessIfRunning(process);
+        return;
+      }
+
       tableRuntime.resetTaskQuotas(
           System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
-      // Close the committing process to avoid duplicate commit on the table.
-      if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
-        if (process != null) {
-          LOG.warn(
-              "Close the committing process {} on table {}",
+
+      if (canResumeProcess(process, tableRuntime)) {
+        tableQueue.offer(process);

Review Comment:
   **Bug (Critical): Missing `tableRuntime.recover(process)` — will cause 
`IllegalStateException` and NPE at runtime**
   
   In the original code, `tableRuntime.recover(process)` was called to set the 
`optimizingProcess` field on `DefaultTableRuntime`. This call has been 
completely removed in the new code. As a result, when a process is resumed, 
`tableRuntime.getOptimizingProcess()` will return `null`.
   
   This directly breaks the commit cycle. `OptimizingCommitExecutor.execute()` 
does:
   
   ```java
   Optional.of(tableRuntime)
       .map(t -> (DefaultTableRuntime) t)
       .map(DefaultTableRuntime::getOptimizingProcess)   // returns null!
       .orElseThrow(() -> new IllegalStateException(
           "OptimizingProcess is null while committing:" + tableRuntime))
       .commit();
   ```
   
   So in the `allTasksPrepared` recovery path, after `beginCommitting()` is 
called, the commit executor will throw `IllegalStateException` because 
`getOptimizingProcess()` returns `null`. The table will remain stuck — the 
exact problem this PR aims to fix.
   
   Additionally, this affects other callers:
   - `DefaultTableMaintainerContext.getTargetSnapshotId()` → NPE when 
`isProcessing()` is true
   - Cancel process via API → silently fails (returns `false`)
   - Disabling self-optimizing → running process is not properly closed
   
   **Suggested fix:**
   
   ```java
   if (canResumeProcess(process, tableRuntime)) {
       tableRuntime.recover(process);  // <-- add this line
       tableQueue.offer(process);
       if (process.allTasksPrepared()) {
           // ...
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to