Copilot commented on code in PR #2937:
URL:
https://github.com/apache/incubator-hugegraph/pull/2937#discussion_r2681005122
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -273,173 +241,39 @@ public <V> void initTaskCallable(HugeTask<V> task) {
@Override
public synchronized <V> void cancel(HugeTask<V> task) {
E.checkArgumentNotNull(task, "Task can't be null");
- this.checkOnMasterNode("cancel");
if (task.completed() || task.cancelling()) {
return;
}
LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
- if (task.server() == null) {
- // The task not scheduled to workers, set canceled immediately
- assert task.status().code() < TaskStatus.QUEUED.code();
- if (task.status(TaskStatus.CANCELLED)) {
- this.save(task);
- return;
- }
- } else if (task.status(TaskStatus.CANCELLING)) {
- // The task scheduled to workers, let the worker node to cancel
+ HugeTask<?> memTask = this.tasks.get(task.id());
+ if (memTask != null) {
+ boolean cancelled = memTask.cancel(true);
+ LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);
Review Comment:
The cancel method no longer saves the task to storage when cancelling a task
that's in memory. If the task is running in memory and gets cancelled via
memTask.cancel(true), the cancelled status won't be persisted to storage until
the task naturally completes. This could cause issues if the server restarts
before the task finishes - the task would be restored and re-executed. Consider
saving the task status to storage after successfully calling
memTask.cancel(true).
```suggestion
LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);
if (cancelled) {
this.save(memTask);
}
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -146,19 +137,17 @@ private TaskTransaction tx() {
@Override
public <V> void restoreTasks() {
- Id selfServer = this.serverManager().selfNodeId();
List<HugeTask<V>> taskList = new ArrayList<>();
// Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order.
+ // Single-node mode: restore all pending tasks without server filtering
Review Comment:
The comment mentions "Single-node mode" but this change applies regardless
of deployment mode. Since the PR description states that the scheduler type now
follows storage type (distributed for hstore, otherwise local), this comment
should be updated to accurately reflect that server filtering has been removed,
not just for single-node deployments.
```suggestion
// Restore all pending tasks without server filtering (applies to
all deployment modes)
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -739,9 +573,7 @@ public <V> V call(Callable<V> callable) {
}
Review Comment:
The checkOnMasterNode method now only contains a comment and no longer
performs any validation. While this change aligns with the PR's goal of
removing master-role logic, the method is still being called in the codebase.
Consider either removing this method entirely or at least adding a deprecation
notice if it's being kept for API compatibility.
```suggestion
/**
* Check whether the current node is allowed to perform the specified
* operation. This method previously enforced master-role validation but
* now performs no checks in single-node mode.
*
* @deprecated This method no longer performs any validation and is kept
* only for API compatibility. New code should not call it.
*/
@Deprecated
```
##########
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java:
##########
@@ -273,173 +241,39 @@ public <V> void initTaskCallable(HugeTask<V> task) {
@Override
public synchronized <V> void cancel(HugeTask<V> task) {
E.checkArgumentNotNull(task, "Task can't be null");
- this.checkOnMasterNode("cancel");
if (task.completed() || task.cancelling()) {
return;
}
LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
- if (task.server() == null) {
- // The task not scheduled to workers, set canceled immediately
- assert task.status().code() < TaskStatus.QUEUED.code();
- if (task.status(TaskStatus.CANCELLED)) {
- this.save(task);
- return;
- }
- } else if (task.status(TaskStatus.CANCELLING)) {
- // The task scheduled to workers, let the worker node to cancel
+ HugeTask<?> memTask = this.tasks.get(task.id());
+ if (memTask != null) {
+ boolean cancelled = memTask.cancel(true);
+ LOG.info("Task '{}' cancel result: {}", task.id(), cancelled);
+ return;
+ }
+
+ if (task.status(TaskStatus.CANCELLED)) {
this.save(task);
- assert task.server() != null : task;
- assert this.serverManager().selfIsMaster();
- if (!task.server().equals(this.serverManager().selfNodeId())) {
- /*
- * Remove the task from memory if it's running on worker node,
- * but keep the task in memory if it's running on master node.
- * Cancel-scheduling will read the task from backend store, if
- * removed this instance from memory, there will be two task
- * instances with the same id, and can't cancel the real task
that
- * is running but removed from memory.
- */
- this.remove(task);
- }
- // Notify master server to schedule and execute immediately
- TaskManager.instance().notifyNewTask(task);
return;
}
throw new HugeException("Can't cancel task '%s' in status %s",
- task.id(), task.status());
+ task.id(), task.status());
}
@Override
public ServerInfoManager serverManager() {
return this.serverManager;
}
- protected synchronized void scheduleTasksOnMaster() {
- // Master server schedule all scheduling tasks to suitable worker nodes
- Collection<HugeServerInfo> serverInfos =
this.serverManager().allServerInfos();
- String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
- do {
- Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
- while (tasks.hasNext()) {
- HugeTask<?> task = tasks.next();
- if (task.server() != null) {
- // Skip if already scheduled
- continue;
- }
-
- if (!this.serverManager.selfIsMaster()) {
- return;
- }
-
- HugeServerInfo server =
this.serverManager().pickWorkerNode(serverInfos, task);
- if (server == null) {
- LOG.info("The master can't find suitable servers to " +
- "execute task '{}', wait for next schedule",
task.id());
- continue;
- }
-
- // Found suitable server, update task status
- assert server.id() != null;
- task.server(server.id());
- task.status(TaskStatus.SCHEDULED);
- this.save(task);
-
- // Update server load in memory, it will be saved at the ending
- server.increaseLoad(task.load());
-
- LOG.info("Scheduled task '{}' to server '{}'", task.id(),
server.id());
- }
- if (page != null) {
- page = PageInfo.pageInfo(tasks);
- }
- } while (page != null);
-
- // Save to store
- this.serverManager().updateServerInfos(serverInfos);
- }
-
- protected void executeTasksOnWorker(Id server) {
- String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
- do {
- Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
- while (tasks.hasNext()) {
- HugeTask<?> task = tasks.next();
- this.initTaskCallable(task);
- Id taskServer = task.server();
- if (taskServer == null) {
- LOG.warn("Task '{}' may not be scheduled", task.id());
- continue;
- }
- HugeTask<?> memTask = this.tasks.get(task.id());
- if (memTask != null) {
- assert memTask.status().code() > task.status().code();
- continue;
- }
- if (taskServer.equals(server)) {
- task.status(TaskStatus.QUEUED);
- this.save(task);
- this.submitTask(task);
- }
- }
- if (page != null) {
- page = PageInfo.pageInfo(tasks);
- }
- } while (page != null);
- }
-
- protected void cancelTasksOnWorker(Id server) {
- String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
- do {
- Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
- while (tasks.hasNext()) {
- HugeTask<?> task = tasks.next();
- Id taskServer = task.server();
- if (taskServer == null) {
- LOG.warn("Task '{}' may not be scheduled", task.id());
- continue;
- }
- if (!taskServer.equals(server)) {
- continue;
- }
- /*
- * Task may be loaded from backend store and not initialized.
- * like: A task is completed but failed to save in the last
- * step, resulting in the status of the task not being
- * updated to storage, the task is not in memory, so it's not
- * initialized when canceled.
- */
- HugeTask<?> memTask = this.tasks.get(task.id());
- if (memTask != null) {
- task = memTask;
- } else {
- this.initTaskCallable(task);
- }
- boolean cancelled = task.cancel(true);
- LOG.info("Server '{}' cancel task '{}' with cancelled={}",
- server, task.id(), cancelled);
- }
- if (page != null) {
- page = PageInfo.pageInfo(tasks);
- }
- } while (page != null);
- }
-
@Override
public void taskDone(HugeTask<?> task) {
this.remove(task);
-
- Id selfServerId = this.serverManager().selfNodeId();
- try {
- this.serverManager().decreaseLoad(task.load());
- } catch (Throwable e) {
- LOG.error("Failed to decrease load for task '{}' on server '{}'",
- task.id(), selfServerId, e);
- }
- LOG.debug("Task '{}' done on server '{}'", task.id(), selfServerId);
+ // Single-node mode: no need to manage load
Review Comment:
The comment mentions "Single-node mode" but this scheduler can now also be
used in distributed mode (when storage type is hstore). The comment should be
updated to reflect that this is a simplified scheduler that doesn't manage
server load, regardless of deployment mode.
```suggestion
// Simplified scheduler: does not manage server load in any mode
```
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java:
##########
@@ -557,13 +553,16 @@ public void testGremlinJobAndCancel() throws
TimeoutException {
scheduler.cancel(task);
task = scheduler.task(task.id());
- Assert.assertEquals(TaskStatus.CANCELLING, task.status());
+ System.out.println(scheduler.getClass());
Review Comment:
Debug output should be removed. This System.out.println statement was likely
added during debugging and should not be committed to production code.
```suggestion
```
##########
hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java:
##########
@@ -32,17 +29,16 @@
import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.job.GremlinJob;
import org.apache.hugegraph.job.JobBuilder;
-import org.apache.hugegraph.task.HugeTask;
-import org.apache.hugegraph.task.TaskCallable;
-import org.apache.hugegraph.task.TaskScheduler;
-import org.apache.hugegraph.task.TaskStatus;
+import org.apache.hugegraph.task.*;
Review Comment:
Wildcard import should be avoided. Replace "import
org.apache.hugegraph.task.*;" with explicit imports for the specific classes
being used (HugeTask, TaskCallable, TaskScheduler, TaskStatus,
DistributedTaskScheduler).
```suggestion
import org.apache.hugegraph.task.DistributedTaskScheduler;
import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.task.TaskCallable;
import org.apache.hugegraph.task.TaskScheduler;
import org.apache.hugegraph.task.TaskStatus;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]