imbajin commented on code in PR #2937:
URL:
https://github.com/apache/incubator-hugegraph/pull/2937#discussion_r2804295905
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -253,6 +261,10 @@ public <V> Future<?> schedule(HugeTask<V> task) {
return this.ephemeralTaskExecutor.submit(task);
}
+ // Validate task state before saving to ensure correct exception type
+ E.checkState(task.type() != null, "Task type can't be null");
+ E.checkState(task.name() != null, "Task name can't be null");
+
// Process schema task
// Handle gremlin task
// Handle OLAP calculation tasks
Review Comment:
⚠️ **并发安全问题: 缺少 volatile 或同步保护**
在 `DistributedTaskScheduler.java:268-270`:
```java
if (this.closed.get()) {
return;
}
```
**问题**:
虽然添加了 closed 检查,但 `this.graph.started()` 和 `this.graph.closed()`
在后续调用时可能已经改变,存在 TOCTOU (Time-of-check to time-of-use) 问题。
**建议**:
```suggestion
if (this.closed.get() || !this.graph.started() || this.graph.closed()) {
return;
}
```
将所有状态检查合并到一个 if 语句中,减少竞态窗口。
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -316,14 +355,18 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
- if (!force) {
- // Change status to DELETING, perform the deletion operation
through automatic
- // scheduling.
+ HugeTask<?> task = this.taskWithoutResult(id);
+
+ if (!force && !task.completed()) {
+ // Check task status: can't delete running tasks without force
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
- } else {
- return this.deleteFromDB(id);
+ // Already in DELETING status, delete directly from DB
+ // Completed tasks can also be deleted directly
Review Comment:
‼️ **任务删除逻辑错误**
在 `DistributedTaskScheduler.java:355-366`:
```java
if (!force && !task.completed()) {
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
// 下面的代码永远无法执行
}
```
**问题**:
1. Line 360 的注释 "Already in DELETING status" 放在了 `return null` 之后,永远无法到达
2. 逻辑不清晰:应该区分「未完成的任务」和「已经是 DELETING 状态的任务」
**建议**:
```suggestion
HugeTask<?> task = this.taskWithoutResult(id);
if (!force && !task.completed()) {
// Non-force mode: mark incomplete tasks as DELETING for async cleanup
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
}
// Force mode OR task is completed OR already DELETING: delete directly
return this.deleteFromDB(id);
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -353,6 +396,18 @@ public boolean close() {
cronFuture.cancel(false);
}
+ // Wait for cron task to complete to ensure all transactions are closed
+ try {
+ cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
+ } catch (CancellationException e) {
+ // Task was cancelled, this is expected
+ LOG.debug("Cron task was cancelled");
+ } catch (TimeoutException e) {
+ LOG.warn("Cron task did not complete in time when closing
scheduler");
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.warn("Exception while waiting for cron task to complete", e);
+ }
+
Review Comment:
⚠️ **等待超时可能导致数据库事务泄漏**
在 `DistributedTaskScheduler.java:399-410`:
```java
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
```
**问题**:
1. 如果 cron 任务超时,后续的 `taskDbExecutor` 清理可能在 cron 任务仍持有事务时执行
2. 注释说 "ensure all transactions are closed",但超时时无法保证
3. `schedulePeriod` 可能很大(如 60 秒),导致关闭等待过长
**建议**:
```suggestion
// Wait for cron task with reasonable timeout
long waitTime = Math.min(schedulePeriod + 5, 15); // Cap at 15 seconds
try {
cronFuture.get(waitTime, TimeUnit.SECONDS);
} catch (CancellationException e) {
LOG.debug("Cron task was cancelled");
} catch (TimeoutException e) {
LOG.warn("Cron task did not complete in {}s, proceeding with shutdown",
waitTime);
// Force interrupt the cron task thread if possible
cronFuture.cancel(true);
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception while waiting for cron task to complete", e);
}
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -363,7 +418,10 @@ public boolean close() {
this.graph.closeTx();
});
}
- return true;
+
+ //todo: serverInfoManager section should be removed in the future.
+ return this.serverManager().close();
+ //return true;
Review Comment:
🧹 **TODO 注释应该追踪到 Issue**
在 `DistributedTaskScheduler.java:422-424`:
```java
//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;
```
**建议**:
将 TODO 关联到 GitHub Issue,方便追踪:
```suggestion
// TODO(#issue-number): Remove serverInfoManager completely after full
single-node migration
return this.serverManager().close();
```
并删除注释掉的代码 `//return true;`
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
}
}
+ /**
+ * Note: This method will update the status of the input task.
+ *
+ * @param task
+ * @param <V>
+ */
@Override
public <V> void cancel(HugeTask<V> task) {
- // Update status to CANCELLING
- if (!task.completed()) {
- // Task not completed, can only execute status not CANCELLING
- this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+ E.checkArgumentNotNull(task, "Task can't be null");
+
+ if (task.completed() || task.cancelling()) {
+ return;
+ }
+
+ LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
+
+ // Check if task is running locally, cancel it directly if so
+ HugeTask<?> runningTask = this.runningTasks.get(task.id());
+ if (runningTask != null) {
+ boolean cancelled = runningTask.cancel(true);
+ if (cancelled) {
+ task.overwriteStatus(TaskStatus.CANCELLED);
+ }
+ LOG.info("Cancel local running task '{}' result: {}", task.id(),
cancelled);
+ return;
+ }
+
+ // Task not running locally, update status to CANCELLING
+ // for cronSchedule() or other nodes to handle
+ TaskStatus currentStatus = task.status();
+ if (!this.updateStatus(task.id(), currentStatus,
TaskStatus.CANCELLING)) {
+ LOG.info("Failed to cancel task '{}', status may have changed from
{}",
Review Comment:
‼️ **状态更新失败后的任务状态不一致**
在 `DistributedTaskScheduler.java:323-330`:
```java
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
LOG.info("Failed to cancel task '{}', status may have changed from {}",
task.id(), currentStatus);
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}
```
**问题**:
- 当 `updateStatus` 返回 false 时(数据库更新失败),内存中的 `task` 对象状态未同步
- 调用者可能认为 cancel 成功,但数据库中状态未改变
- 缺少错误处理:应该重新读取最新状态或抛出异常
**建议**:
```suggestion
TaskStatus currentStatus = task.status();
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
// Status changed concurrently, reload from DB
HugeTask<?> latestTask = this.taskWithoutResult(task.id());
LOG.info("Failed to cancel task '{}': status changed from {} to {}",
task.id(), currentStatus, latestTask.status());
task.overwriteStatus(latestTask.status());
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]