lintingbin commented on code in PR #4090:
URL: https://github.com/apache/amoro/pull/4090#discussion_r2852487023
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -129,42 +129,88 @@ public OptimizingQueue(
}
private void initTableRuntime(DefaultTableRuntime tableRuntime) {
- TableOptimizingProcess process = null;
- if (tableRuntime.getOptimizingStatus().isProcessing() &&
tableRuntime.getProcessId() != 0) {
- TableProcessMeta meta =
- getAs(
- TableProcessMapper.class,
- mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
- OptimizingProcessState state =
- getAs(
- OptimizingProcessMapper.class,
- mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
- process = new TableOptimizingProcess(tableRuntime, meta, state);
- tableRuntime.recover(process);
- }
-
- if (tableRuntime.getOptimizingConfig().isEnabled()) {
+ try {
+ TableOptimizingProcess process = loadProcess(tableRuntime);
+
+ if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+ closeProcessIfRunning(process);
+ return;
+ }
+
tableRuntime.resetTaskQuotas(
System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
- // Close the committing process to avoid duplicate commit on the table.
- if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
- if (process != null) {
- LOG.warn(
- "Close the committing process {} on table {}",
+
+ if (canResumeProcess(process, tableRuntime)) {
+ tableQueue.offer(process);
+ if (process.allTasksPrepared()) {
+ LOG.info(
+ "All tasks already completed for process {} on table {} during
recovery,"
+ + " triggering commit",
process.getProcessId(),
tableRuntime.getTableIdentifier());
- process.close(false);
+ tableRuntime.beginCommitting();
Review Comment:
**Nit: Process left in `tableQueue` with no remaining tasks**
When `allTasksPrepared()` is true and `beginCommitting()` is called, the
process has already been added to `tableQueue` (line above) but its `taskQueue`
is empty — all tasks are `SUCCESS`. This "empty shell" process will sit in
`tableQueue` indefinitely:
- `pollTask()` will iterate over it on every poll but never get a task from
it
- It won't be cleaned up until `clearProcess()` is called after a commit
Consider either not adding it to `tableQueue` when all tasks are already
completed, or removing it after triggering `beginCommitting()`:
```java
if (process.allTasksPrepared()) {
tableQueue.remove(process);
// ...
tableRuntime.beginCommitting();
}
```
##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java:
##########
@@ -434,22 +433,22 @@ public void completeProcess(boolean success) {
public void completeEmptyProcess() {
OptimizingStatus originalStatus = getOptimizingStatus();
- boolean needUpdate =
- originalStatus == OptimizingStatus.PLANNING || originalStatus ==
OptimizingStatus.PENDING;
- if (needUpdate) {
- store()
- .begin()
- .updateStatusCode(code -> OptimizingStatus.IDLE.getCode())
- .updateState(
- OPTIMIZING_STATE_KEY,
- state -> {
- state.setLastOptimizedSnapshotId(state.getCurrentSnapshotId());
-
state.setLastOptimizedChangeSnapshotId(state.getCurrentChangeSnapshotId());
- return state;
- })
- .updateState(PENDING_INPUT_KEY, any -> new
AbstractOptimizingEvaluator.PendingInput())
- .commit();
+ if (originalStatus == OptimizingStatus.IDLE) {
+ return;
Review Comment:
**Caution: Expanded scope of `completeEmptyProcess()` may have unintended
side effects**
The original implementation only allowed `PLANNING` and `PENDING` states to
be reset to `IDLE`. The new version resets **any** non-IDLE state (including
`*_OPTIMIZING` and `COMMITTING`).
While this is needed by `resetTableForRecovery()` (which guards with its own
IDLE/PENDING check), `completeEmptyProcess()` is a `public` method. Any
existing or future caller that invokes it on a `*_OPTIMIZING` table will now
silently reset it to IDLE and mark the current snapshot as "optimized" — this
was previously a no-op.
Consider either:
1. Adding a comment documenting the new broader contract, or
2. Making `resetTableForRecovery` use a dedicated internal method, keeping
`completeEmptyProcess()` scoped to its original states to avoid accidentally
resetting actively-optimizing tables from other call sites.
##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java:
##########
@@ -114,8 +114,7 @@ public DefaultTableRuntime(TableRuntimeStore store) {
}
public void recover(OptimizingProcess optimizingProcess) {
- if (!getOptimizingStatus().isProcessing()
- || !Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
+ if (!Objects.equals(optimizingProcess.getProcessId(), getProcessId())) {
Review Comment:
**Note: This change is currently dead code**
Since `recover()` is no longer called anywhere in the new
`initTableRuntime()` logic, this modification has no effect in the current PR.
However, if the missing `recover()` call is added back (see my comment on
`OptimizingQueue.java`), then this change becomes necessary — because the
resume path may call `recover()` on a table whose `isProcessing()` status is
true (e.g., `*_OPTIMIZING`), so relaxing the guard condition here makes sense
in that context.
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -129,42 +129,88 @@ public OptimizingQueue(
}
private void initTableRuntime(DefaultTableRuntime tableRuntime) {
- TableOptimizingProcess process = null;
- if (tableRuntime.getOptimizingStatus().isProcessing() &&
tableRuntime.getProcessId() != 0) {
- TableProcessMeta meta =
- getAs(
- TableProcessMapper.class,
- mapper -> mapper.getProcessMeta(tableRuntime.getProcessId()));
- OptimizingProcessState state =
- getAs(
- OptimizingProcessMapper.class,
- mapper -> mapper.getProcessState(tableRuntime.getProcessId()));
- process = new TableOptimizingProcess(tableRuntime, meta, state);
- tableRuntime.recover(process);
- }
-
- if (tableRuntime.getOptimizingConfig().isEnabled()) {
+ try {
+ TableOptimizingProcess process = loadProcess(tableRuntime);
+
+ if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+ closeProcessIfRunning(process);
+ return;
+ }
+
tableRuntime.resetTaskQuotas(
System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
- // Close the committing process to avoid duplicate commit on the table.
- if (tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING) {
- if (process != null) {
- LOG.warn(
- "Close the committing process {} on table {}",
+
+ if (canResumeProcess(process, tableRuntime)) {
+ tableQueue.offer(process);
Review Comment:
**Bug (Critical): Missing `tableRuntime.recover(process)` — will cause
`IllegalStateException` and NPE at runtime**
In the original code, `tableRuntime.recover(process)` was called to set the
`optimizingProcess` field on `DefaultTableRuntime`. This call has been
completely removed in the new code. As a result, when a process is resumed,
`tableRuntime.getOptimizingProcess()` will return `null`.
This directly breaks the commit cycle. `OptimizingCommitExecutor.execute()`
does:
```java
Optional.of(tableRuntime)
.map(t -> (DefaultTableRuntime) t)
.map(DefaultTableRuntime::getOptimizingProcess) // returns null!
.orElseThrow(() -> new IllegalStateException(
"OptimizingProcess is null while committing:" + tableRuntime))
.commit();
```
So in the `allTasksPrepared` recovery path, after `beginCommitting()` is
called, the commit executor will throw `IllegalStateException` because
`getOptimizingProcess()` returns `null`. The table will remain stuck — the
exact problem this PR aims to fix.
Additionally, this affects other callers:
- `DefaultTableMaintainerContext.getTargetSnapshotId()` → NPE when
`isProcessing()` is true
- Cancel process via API → silently fails (returns `false`)
- Disabling self-optimizing → running process is not properly closed
**Suggested fix:**
```java
if (canResumeProcess(process, tableRuntime)) {
tableRuntime.recover(process); // <-- add this line
tableQueue.offer(process);
if (process.allTasksPrepared()) {
// ...
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]