imbajin commented on code in PR #2937:
URL: 
https://github.com/apache/incubator-hugegraph/pull/2937#discussion_r2697146633


##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -284,14 +295,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
         }
     }
 
+    /**
+     * Note: This method will update the status of the input task.
+     *
+     * @param task
+     * @param <V>
+     */
     @Override
     public <V> void cancel(HugeTask<V> task) {
-        // Update status to CANCELLING
-        if (!task.completed()) {
-            // Task not completed, can only execute status not CANCELLING
-            this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+        E.checkArgumentNotNull(task, "Task can't be null");
+
+        if (task.completed() || task.cancelling()) {
+            return;
+        }
+
+        LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
+
+        // Check if task is running locally, cancel it directly if so
+        HugeTask<?> runningTask = this.runningTasks.get(task.id());
+        if (runningTask != null) {
+            boolean cancelled = runningTask.cancel(true);
+            if (cancelled) {
+                task.overwriteStatus(TaskStatus.CANCELLED);
+            }
+            LOG.info("Cancel local running task '{}' result: {}", task.id(), 
cancelled);
+            return;
+        }
+
+        // Task not running locally, update status to CANCELLING
+        // for cronSchedule() or other nodes to handle
+        TaskStatus currentStatus = task.status();
+        if (!this.updateStatus(task.id(), currentStatus, 
TaskStatus.CANCELLING)) {
+            LOG.info("Failed to cancel task '{}', status may have changed from 
{}",
+                     task.id(), currentStatus);

Review Comment:
   ⚠️ **资源关闭顺序问题 - 可能导致事务泄漏**
   
   在 `DistributedTaskScheduler.close()` 方法第 313-336 行,cron task 等待完成的逻辑在关闭 
`taskDbExecutor` 之前:
   
   ```java
   try {
       cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
   } catch (CancellationException e) {
       LOG.debug("Cron task was cancelled");
   } catch (TimeoutException e) {
       LOG.warn("Cron task did not complete in time when closing scheduler");
   } catch (ExecutionException | InterruptedException e) {
       LOG.warn("Exception while waiting for cron task to complete", e);
   }
   ```
   
   **问题**:
   1. 如果 cron task 超时,后续的数据库操作可能仍在执行,但 `taskDbExecutor` 已经被关闭
   2. `InterruptedException` 被捕获但线程中断状态未恢复
   
   **建议**:
   ```suggestion
   } catch (ExecutionException | InterruptedException e) {
       LOG.warn("Exception while waiting for cron task to complete", e);
       if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt(); // 恢复中断状态
       }
   }
   ```



##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -284,14 +295,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
         }

Review Comment:
   ‼️ **业务逻辑设计问题 - delete 方法的强制删除逻辑不一致**
   
   在 `DistributedTaskScheduler.delete()` 方法第 286-305 行,删除逻辑与原实现存在重大差异:
   
   **原逻辑**:
   - `force=false`: 设置状态为 DELETING,返回 null
   - `force=true`: 直接从数据库删除
   
   **新逻辑**:
   ```java
   if (!force) {
       if (!task.completed() && task.status() != TaskStatus.DELETING) {
           throw new IllegalArgumentException(
                   String.format("Can't delete incomplete task '%s' in status 
%s, " +
                                 "Please try to cancel the task first",
                                 id, task.status()));
       }
   }
   return this.deleteFromDB(id);
   ```
   
   **问题**:
   1. 移除了 DELETING 状态的设置逻辑,这可能破坏依赖定时清理的代码
   2. 非强制删除现在会直接删除完成的任务,而不是先标记为 DELETING
   3. 与 `StandardTaskScheduler` 的实现可能不一致
   
   **建议**:重新考虑删除流程,保持与原有逻辑的兼容性,或在 PR 描述中明确说明此行为变更



##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java:
##########
@@ -284,14 +295,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
         }
     }
 
+    /**
+     * Note: This method will update the status of the input task.
+     *
+     * @param task
+     * @param <V>
+     */
     @Override
     public <V> void cancel(HugeTask<V> task) {
-        // Update status to CANCELLING
-        if (!task.completed()) {
-            // Task not completed, can only execute status not CANCELLING
-            this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
+        E.checkArgumentNotNull(task, "Task can't be null");
+
+        if (task.completed() || task.cancelling()) {
+            return;
+        }
+
+        LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
+
+        // Check if task is running locally, cancel it directly if so
+        HugeTask<?> runningTask = this.runningTasks.get(task.id());
+        if (runningTask != null) {
+            boolean cancelled = runningTask.cancel(true);
+            if (cancelled) {
+                task.overwriteStatus(TaskStatus.CANCELLED);
+            }
+            LOG.info("Cancel local running task '{}' result: {}", task.id(), 
cancelled);
+            return;
+        }
+
+        // Task not running locally, update status to CANCELLING
+        // for cronSchedule() or other nodes to handle
+        TaskStatus currentStatus = task.status();
+        if (!this.updateStatus(task.id(), currentStatus, 
TaskStatus.CANCELLING)) {
+            LOG.info("Failed to cancel task '{}', status may have changed from 
{}",
+                     task.id(), currentStatus);
         } else {
-            LOG.info("cancel task({}) error, task has completed", task.id());
+            task.overwriteStatus(TaskStatus.CANCELLING);
         }
     }

Review Comment:
   ⚠️ **代码可维护性 - TODO 注释应该更具体**
   
   第 334 行的 TODO 注释过于模糊:
   
   ```java
   //todo: serverInfoManager section should be removed in the future.
   return this.serverManager().close();
   //return true;
   ```
   
   **问题**:
   1. 未说明为什么要移除 serverInfoManager
   2. 未说明移除的时间节点或前提条件
   3. 注释掉的代码应该删除,而不是保留
   
   **建议**:
   ```suggestion
   // TODO(issue-XXX): Remove serverInfoManager.close() after migrating to 
   // pure single-node architecture. Currently kept for backward compatibility.
   return this.serverManager().close();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to