imbajin commented on code in PR #2937:
URL:
https://github.com/apache/incubator-hugegraph/pull/2937#discussion_r2804208363
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
}
}
+ /**
+ * Note: This method will update the status of the input task.
+ *
+ * @param task
+ * @param <V>
+ */
@Override
public <V> void cancel(HugeTask<V> task) {
- // Update status to CANCELLING
- if (!task.completed()) {
- // Task not completed, can only execute status not CANCELLING
- this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+ E.checkArgumentNotNull(task, "Task can't be null");
+
+ if (task.completed() || task.cancelling()) {
+ return;
+ }
+
+ LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
+
+ // Check if task is running locally, cancel it directly if so
+ HugeTask<?> runningTask = this.runningTasks.get(task.id());
+ if (runningTask != null) {
+ boolean cancelled = runningTask.cancel(true);
+ if (cancelled) {
+ task.overwriteStatus(TaskStatus.CANCELLED);
+ }
+ LOG.info("Cancel local running task '{}' result: {}", task.id(),
cancelled);
+ return;
+ }
+
+ // Task not running locally, update status to CANCELLING
+ // for cronSchedule() or other nodes to handle
+ TaskStatus currentStatus = task.status();
+ if (!this.updateStatus(task.id(), currentStatus,
TaskStatus.CANCELLING)) {
Review Comment:
‼️ 这里使用 `currentStatus` 做 CAS 更新有明显竞态:调用方读取任务后到执行 `cancel()` 之间,任务可能从
`QUEUED` 变成 `RUNNING`,导致 `updateStatus(..., currentStatus, CANCELLING)`
失败并直接返回,取消请求被静默丢弃。建议 CAS 失败后读取最新状态并做一次兜底更新(只要未完成就转 `CANCELLING`)。
```suggestion
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
HugeTask<?> latest = this.taskWithoutResult(task.id());
if (!latest.completed() && !latest.cancelling()) {
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
task.overwriteStatus(TaskStatus.CANCELLING);
}
} else {
task.overwriteStatus(TaskStatus.CANCELLING);
}
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -211,30 +206,9 @@ public <V> Future<?> schedule(HugeTask<V> task) {
return this.submitTask(task);
}
- // Check this is on master for normal task schedule
- this.checkOnMasterNode("schedule");
- if (this.serverManager().onlySingleNode() && !task.computer()) {
- /*
- * Speed up for single node, submit the task immediately,
- * this code can be removed without affecting code logic
- */
- task.status(TaskStatus.QUEUED);
- task.server(this.serverManager().selfNodeId());
- this.save(task);
- return this.submitTask(task);
- } else {
- /*
- * Just set the SCHEDULING status and save the task,
- * it will be scheduled by periodic scheduler worker
- */
- task.status(TaskStatus.SCHEDULING);
- this.save(task);
-
- // Notify master server to schedule and execute immediately
- TaskManager.instance().notifyNewTask(task);
-
- return task;
- }
+ task.status(TaskStatus.QUEUED);
Review Comment:
‼️ 这里将普通任务直接置为 `QUEUED` 并提交,但不再写入 `task.server`。结合 `restoreTasks()` 取消按
server 过滤后,在共享后端的多实例场景(如 MySQL/PostgreSQL/Cassandra)重启时,每个节点都可能恢复并执行同一批 pending
任务,产生重复执行。建议保留 owner 绑定,至少让恢复阶段可按 owner 做隔离。
```suggestion
task.status(TaskStatus.QUEUED);
task.server(this.serverManager().selfNodeId());
this.save(task);
return this.submitTask(task);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]