Copilot commented on code in PR #2937:
URL: 
https://github.com/apache/incubator-hugegraph/pull/2937#discussion_r2681005122


##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -273,173 +241,39 @@ public <V> void initTaskCallable(HugeTask<V> task) {
     @Override
     public synchronized <V> void cancel(HugeTask<V> task) {
         E.checkArgumentNotNull(task, "Task can't be null");
-        this.checkOnMasterNode("cancel");
 
         if (task.completed() || task.cancelling()) {
             return;
         }
 
         LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
 
-        if (task.server() == null) {
-            // The task not scheduled to workers, set canceled immediately
-            assert task.status().code() < TaskStatus.QUEUED.code();
-            if (task.status(TaskStatus.CANCELLED)) {
-                this.save(task);
-                return;
-            }
-        } else if (task.status(TaskStatus.CANCELLING)) {
-            // The task scheduled to workers, let the worker node to cancel
+        HugeTask<?> memTask = this.tasks.get(task.id());
+        if (memTask != null) {
+            boolean cancelled = memTask.cancel(true);
+            LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);

Review Comment:
   The cancel method no longer saves the task to storage when cancelling a task 
that's in memory. If the task is running in memory and gets cancelled via 
memTask.cancel(true), the cancelled status won't be persisted to storage until 
the task naturally completes. This could cause issues if the server restarts 
before the task finishes - the task would be restored and re-executed. Consider 
saving the task status to storage after successfully calling 
memTask.cancel(true).
   ```suggestion
               LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);
               if (cancelled) {
                   this.save(memTask);
               }
   ```



##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -146,19 +137,17 @@ private TaskTransaction tx() {
 
     @Override
     public <V> void restoreTasks() {
-        Id selfServer = this.serverManager().selfNodeId();
         List<HugeTask<V>> taskList = new ArrayList<>();
         // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order.
+        // Single-node mode: restore all pending tasks without server filtering

Review Comment:
   The comment mentions "Single-node mode" but this change applies regardless 
of deployment mode. Since the PR description states that the scheduler type now 
follows storage type (distributed for hstore, otherwise local), this comment 
should be updated to accurately reflect that server filtering has been removed, 
not just for single-node deployments.
   ```suggestion
           // Restore all pending tasks without server filtering (applies to 
all deployment modes)
   ```



##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -739,9 +573,7 @@ public <V> V call(Callable<V> callable) {
     }
 

Review Comment:
   The checkOnMasterNode method now only contains a comment and no longer 
performs any validation. While this change aligns with the PR's goal of 
removing master-role logic, the method is still being called in the codebase. 
Consider either removing this method entirely or at least adding a deprecation 
notice if it's being kept for API compatibility.
   ```suggestion
   
       /**
        * Check whether the current node is allowed to perform the specified
        * operation. This method previously enforced master-role validation but
        * now performs no checks in single-node mode.
        *
        * @deprecated This method no longer performs any validation and is kept
        *             only for API compatibility. New code should not call it.
        */
       @Deprecated
   ```



##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -273,173 +241,39 @@ public <V> void initTaskCallable(HugeTask<V> task) {
     @Override
     public synchronized <V> void cancel(HugeTask<V> task) {
         E.checkArgumentNotNull(task, "Task can't be null");
-        this.checkOnMasterNode("cancel");
 
         if (task.completed() || task.cancelling()) {
             return;
         }
 
         LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
 
-        if (task.server() == null) {
-            // The task not scheduled to workers, set canceled immediately
-            assert task.status().code() < TaskStatus.QUEUED.code();
-            if (task.status(TaskStatus.CANCELLED)) {
-                this.save(task);
-                return;
-            }
-        } else if (task.status(TaskStatus.CANCELLING)) {
-            // The task scheduled to workers, let the worker node to cancel
+        HugeTask<?> memTask = this.tasks.get(task.id());
+        if (memTask != null) {
+            boolean cancelled = memTask.cancel(true);
+            LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);
+            return;
+        }
+
+        if (task.status(TaskStatus.CANCELLED)) {
             this.save(task);
-            assert task.server() != null : task;
-            assert this.serverManager().selfIsMaster();
-            if (!task.server().equals(this.serverManager().selfNodeId())) {
-                /*
-                 * Remove the task from memory if it's running on worker node,
-                 * but keep the task in memory if it's running on master node.
-                 * Cancel-scheduling will read the task from backend store, if
-                 * removed this instance from memory, there will be two task
-                 * instances with the same id, and can't cancel the real task 
that
-                 * is running but removed from memory.
-                 */
-                this.remove(task);
-            }
-            // Notify master server to schedule and execute immediately
-            TaskManager.instance().notifyNewTask(task);
             return;
         }
 
         throw new HugeException("Can't cancel task '%s' in status %s",
-                                task.id(), task.status());
+                task.id(), task.status());
     }
 
     @Override
     public ServerInfoManager serverManager() {
         return this.serverManager;
     }
 
-    protected synchronized void scheduleTasksOnMaster() {
-        // Master server schedule all scheduling tasks to suitable worker nodes
-        Collection<HugeServerInfo> serverInfos = 
this.serverManager().allServerInfos();
-        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
-        do {
-            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
-            while (tasks.hasNext()) {
-                HugeTask<?> task = tasks.next();
-                if (task.server() != null) {
-                    // Skip if already scheduled
-                    continue;
-                }
-
-                if (!this.serverManager.selfIsMaster()) {
-                    return;
-                }
-
-                HugeServerInfo server = 
this.serverManager().pickWorkerNode(serverInfos, task);
-                if (server == null) {
-                    LOG.info("The master can't find suitable servers to " +
-                             "execute task '{}', wait for next schedule", 
task.id());
-                    continue;
-                }
-
-                // Found suitable server, update task status
-                assert server.id() != null;
-                task.server(server.id());
-                task.status(TaskStatus.SCHEDULED);
-                this.save(task);
-
-                // Update server load in memory, it will be saved at the ending
-                server.increaseLoad(task.load());
-
-                LOG.info("Scheduled task '{}' to server '{}'", task.id(), 
server.id());
-            }
-            if (page != null) {
-                page = PageInfo.pageInfo(tasks);
-            }
-        } while (page != null);
-
-        // Save to store
-        this.serverManager().updateServerInfos(serverInfos);
-    }
-
-    protected void executeTasksOnWorker(Id server) {
-        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
-        do {
-            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
-            while (tasks.hasNext()) {
-                HugeTask<?> task = tasks.next();
-                this.initTaskCallable(task);
-                Id taskServer = task.server();
-                if (taskServer == null) {
-                    LOG.warn("Task '{}' may not be scheduled", task.id());
-                    continue;
-                }
-                HugeTask<?> memTask = this.tasks.get(task.id());
-                if (memTask != null) {
-                    assert memTask.status().code() > task.status().code();
-                    continue;
-                }
-                if (taskServer.equals(server)) {
-                    task.status(TaskStatus.QUEUED);
-                    this.save(task);
-                    this.submitTask(task);
-                }
-            }
-            if (page != null) {
-                page = PageInfo.pageInfo(tasks);
-            }
-        } while (page != null);
-    }
-
-    protected void cancelTasksOnWorker(Id server) {
-        String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
-        do {
-            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
-            while (tasks.hasNext()) {
-                HugeTask<?> task = tasks.next();
-                Id taskServer = task.server();
-                if (taskServer == null) {
-                    LOG.warn("Task '{}' may not be scheduled", task.id());
-                    continue;
-                }
-                if (!taskServer.equals(server)) {
-                    continue;
-                }
-                /*
-                 * Task may be loaded from backend store and not initialized.
-                 * like: A task is completed but failed to save in the last
-                 * step, resulting in the status of the task not being
-                 * updated to storage, the task is not in memory, so it's not
-                 * initialized when canceled.
-                 */
-                HugeTask<?> memTask = this.tasks.get(task.id());
-                if (memTask != null) {
-                    task = memTask;
-                } else {
-                    this.initTaskCallable(task);
-                }
-                boolean cancelled = task.cancel(true);
-                LOG.info("Server '{}' cancel task '{}' with cancelled={}",
-                         server, task.id(), cancelled);
-            }
-            if (page != null) {
-                page = PageInfo.pageInfo(tasks);
-            }
-        } while (page != null);
-    }
-
     @Override
     public void taskDone(HugeTask<?> task) {
         this.remove(task);
-
-        Id selfServerId = this.serverManager().selfNodeId();
-        try {
-            this.serverManager().decreaseLoad(task.load());
-        } catch (Throwable e) {
-            LOG.error("Failed to decrease load for task '{}' on server '{}'",
-                      task.id(), selfServerId, e);
-        }
-        LOG.debug("Task '{}' done on server '{}'", task.id(), selfServerId);
+        // Single-node mode: no need to manage load

Review Comment:
   The comment mentions "Single-node mode" but this scheduler can now also be 
used in distributed mode (when storage type is hstore). The comment should be 
updated to reflect that this is a simplified scheduler that doesn't manage 
server load, regardless of deployment mode.
   ```suggestion
           // Simplified scheduler: does not manage server load in any mode
   ```



##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java:
##########
@@ -557,13 +553,16 @@ public void testGremlinJobAndCancel() throws 
TimeoutException {
         scheduler.cancel(task);
 
         task = scheduler.task(task.id());
-        Assert.assertEquals(TaskStatus.CANCELLING, task.status());
+        System.out.println(scheduler.getClass());

Review Comment:
   Debug output should be removed. This System.out.println statement was likely 
added during debugging and should not be committed to production code.
   ```suggestion
   
   ```



##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java:
##########
@@ -32,17 +29,16 @@
 import org.apache.hugegraph.job.EphemeralJobBuilder;
 import org.apache.hugegraph.job.GremlinJob;
 import org.apache.hugegraph.job.JobBuilder;
-import org.apache.hugegraph.task.HugeTask;
-import org.apache.hugegraph.task.TaskCallable;
-import org.apache.hugegraph.task.TaskScheduler;
-import org.apache.hugegraph.task.TaskStatus;
+import org.apache.hugegraph.task.*;

Review Comment:
   Wildcard import should be avoided. Replace "import 
org.apache.hugegraph.task.*;" with explicit imports for the specific classes 
being used (HugeTask, TaskCallable, TaskScheduler, TaskStatus, 
DistributedTaskScheduler).
   ```suggestion
   import org.apache.hugegraph.task.DistributedTaskScheduler;
   import org.apache.hugegraph.task.HugeTask;
   import org.apache.hugegraph.task.TaskCallable;
   import org.apache.hugegraph.task.TaskScheduler;
   import org.apache.hugegraph.task.TaskStatus;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to