Copilot commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2381807620


##########
vermeer/test/functional/load_local.go:
##########
@@ -43,3 +46,28 @@ func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
                "load.vertex_backend": 
vertexBackends[rand.Intn(len(vertexBackends))],
        }
 }
+
+// TaskLoadBodyWithNum creates load configuration with specified number of 
files.
+// If num <= 10, it will be automatically adjusted to 30 to ensure minimum 
test coverage.
+func (lt *LoadTaskLocal) TaskLoadBodyWithNum(num int) map[string]string {
+       vertexBackends := []string{"db", "mem"}
+
+       if num <= 10 {
+               num = 30
+       }

Review Comment:
   Magic numbers 10 and 30 should be defined as named constants to improve code 
maintainability and explain why these specific values are chosen.



##########
vermeer/apps/master/schedules/scheduler_task_manager.go:
##########
@@ -0,0 +1,269 @@
+package schedules
+
+import (
+       "errors"
+       "vermeer/apps/structure"
+
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerTaskManager is the manager for the scheduler task.
+* @Note: This is the manager for the scheduler task.
+ */
+type SchedulerTaskManager struct {
+       structure.MutexLocker
+       // This struct is responsible for managing tasks in the scheduling 
system.
+       // A map from task ID to TaskInfo can be used to track tasks.
+       allTaskMap   map[int32]*structure.TaskInfo
+       allTaskQueue []*structure.TaskInfo
+       // For debug or test, get task start sequence
+       startTaskQueue []*structure.TaskInfo
+       // onGoingTasks
+       notCompleteTasks map[int32]*structure.TaskInfo
+       // A map from task ID to worker group can be used to track which worker 
group is handling which task.
+       taskToworkerGroupMap map[int32]string
+}
+
+/*
+* @Description: Init initializes the SchedulerTaskManager.
+* @Note: This function will initialize the SchedulerTaskManager.
+* @Return *SchedulerTaskManager
+ */
+func (t *SchedulerTaskManager) Init() *SchedulerTaskManager {
+       t.allTaskMap = make(map[int32]*structure.TaskInfo)
+       t.notCompleteTasks = make(map[int32]*structure.TaskInfo)
+       t.taskToworkerGroupMap = make(map[int32]string)
+       return t
+}
+
+/*
+* @Description: QueueTask queues the task.
+* @Note: This function will queue the task.
+* @Param taskInfo
+* @Return bool, error
+ */
+func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool, 
error) {
+       if taskInfo == nil {
+               return false, errors.New("the argument `taskInfo` is nil")
+       }
+
+       if taskInfo.SpaceName == "" {
+               return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
+       }
+
+       defer t.Unlock(t.Lock())
+
+       // Add the task to the task map
+       t.allTaskMap[taskInfo.ID] = taskInfo
+       t.allTaskQueue = append(t.allTaskQueue, taskInfo)
+       t.notCompleteTasks[taskInfo.ID] = taskInfo
+       t.AssignGroup(taskInfo)
+       return true, nil
+}
+
+/*
+* @Description: RefreshTaskToWorkerGroupMap refreshes the task to worker group 
map.
+* @Note: This function will refresh the task to worker group map.
+ */
+func (t *SchedulerTaskManager) RefreshTaskToWorkerGroupMap() {
+       defer t.Unlock(t.Lock())
+
+       for _, taskInfo := range t.GetAllTasksNotComplete() {
+               if taskInfo == nil {
+                       continue
+               }
+               t.AssignGroup(taskInfo)
+               t.taskToworkerGroupMap[taskInfo.ID] = 
workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+       }
+}
+
+// Only for debug or test, get task start sequence
+/*
+* @Description: AddTaskStartSequence adds the task start sequence.
+* @Note: This function will add the task start sequence.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) AddTaskStartSequence(taskID int32) error {
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       t.startTaskQueue = append(t.startTaskQueue, t.allTaskMap[taskID])
+       return nil
+}
+
+/*
+* @Description: RemoveTask removes the task.
+* @Note: This function will remove the task.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) RemoveTask(taskID int32) error {
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       defer t.Unlock(t.Lock())
+       delete(t.allTaskMap, taskID)
+       // remove from queue
+       for i, task := range t.allTaskQueue {
+               if task.ID == taskID {
+                       t.allTaskQueue = append(t.allTaskQueue[:i], 
t.allTaskQueue[i+1:]...)
+                       break
+               }
+       }
+       delete(t.taskToworkerGroupMap, taskID)
+       delete(t.notCompleteTasks, taskID)
+       return nil
+}
+
+/*
+* @Description: MarkTaskComplete marks the task complete.
+* @Note: This function will mark the task complete.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) MarkTaskComplete(taskID int32) error {
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       defer t.Unlock(t.Lock())
+       delete(t.notCompleteTasks, taskID)
+       return nil
+}
+
+// update or create a task in the task map
+/*
+* @Description: AssignGroup assigns the group.
+* @Note: This function will assign the group.
+* @Param taskInfo
+* @Return error
+ */
+func (t *SchedulerTaskManager) AssignGroup(taskInfo *structure.TaskInfo) error 
{
+       group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+       if group == "" {
+               return errors.New("failed to assign group for task")
+       }
+       t.taskToworkerGroupMap[taskInfo.ID] = group
+       return nil
+}
+
+/*
+* @Description: GetTaskByID gets the task by ID.
+* @Note: This function will get the task by ID.
+* @Param taskID
+* @Return *structure.TaskInfo, error
+ */
+func (t *SchedulerTaskManager) GetTaskByID(taskID int32) (*structure.TaskInfo, 
error) {
+       task, exists := t.allTaskMap[taskID]
+       if !exists {
+               return nil, errors.New("task not found")
+       }
+       return task, nil
+}
+
+/*
+* @Description: GetLastTask gets the last task.
+* @Note: This function will get the last task.
+* @Param spaceName
+* @Return *structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetLastTask(spaceName string) 
*structure.TaskInfo {
+       // Implement logic to get the last task in the queue for the given space
+       if len(t.allTaskQueue) == 0 {
+               return nil
+       }
+       for i := len(t.allTaskQueue) - 1; i >= 0; i-- {
+               if t.allTaskQueue[i].SpaceName == spaceName {
+                       return t.allTaskQueue[i]
+               }
+       }
+       return nil
+}
+
+/*
+* @Description: GetAllTasks gets all tasks.
+* @Note: This function will get all tasks.
+* @Return []*structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetAllTasks() []*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+       for _, task := range t.allTaskMap {
+               tasks = append(tasks, task)
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksNotComplete() []*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+       for _, task := range t.notCompleteTasks {
+               tasks = append(tasks, task)
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksWaitng() []*structure.TaskInfo {

Review Comment:
   The function name has a typo: 'GetAllTasksWaitng' should be 
'GetAllTasksWaiting'.
   ```suggestion
   func (t *SchedulerTaskManager) GetAllTasksWaiting() []*structure.TaskInfo {
   ```



##########
vermeer/apps/master/bl/task_bl.go:
##########
@@ -62,6 +64,53 @@ func (tb *TaskBl) CreateTaskInfo(
                return nil, err
        }
 
+       // for scheduler
+       taskInfo.Priority = 0
+       taskInfo.Preorders = make([]int32, 0)
+       taskInfo.Exclusive = true // default to true for now, can be set false 
by params
+       if params != nil {
+               if priority, ok := params["priority"]; ok {
+                       if p, err := strconv.ParseInt(priority, 10, 32); err == 
nil {
+                               if p < 0 {
+                                       return nil, fmt.Errorf("priority should 
be non-negative")
+                               }
+                               taskInfo.Priority = int32(p)
+                       } else {
+                               logrus.Warnf("priority convert to int32 
error:%v", err)
+                               return nil, err
+                       }
+               }
+               if preorders, ok := params["preorders"]; ok {
+                       preorderList := strings.Split(preorders, ",")
+                       for _, preorder := range preorderList {
+                               if pid, err := strconv.ParseInt(preorder, 10, 
32); err == nil {
+                                       if taskMgr.GetTaskByID(int32(pid)) == 
nil {
+                                               return nil, 
fmt.Errorf("preorder task id %d not exists", pid)

Review Comment:
   The error message 'preorder task id %d not exists' has incorrect grammar. It 
should be 'preorder task with ID %d does not exist'.
   ```suggestion
                                                return nil, 
fmt.Errorf("preorder task with ID %d does not exist", pid)
   ```



##########
vermeer/apps/master/schedules/scheduler_algorithm_manager.go:
##########
@@ -0,0 +1,507 @@
+package schedules
+
+import (
+       "slices"
+       "sort"
+       "strconv"
+       "time"
+       "vermeer/apps/common"
+       "vermeer/apps/structure"
+
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerAlgorithm is the interface for the scheduler 
algorithm.
+* @Note: This is the interface for the scheduler algorithm.
+ */
+type SchedulerAlgorithm interface {
+       // Name returns the name of the SchedulerAlgorithm
+       Name() string
+       // Init initializes the SchedulerAlgorithm
+       Init()
+       // FilterNextTasks filters the next tasks to be scheduled based on the 
provided parameters
+       FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap 
map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, 
softSchedule bool) ([]*structure.TaskInfo, error)
+       // ScheduleNextTasks schedules the next tasks based on the filtered 
tasks
+       ScheduleNextTasks(filteredTasks []*structure.TaskInfo, 
taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, 
concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, 
error)
+}
+
+/*
+* @Description: SchedulerAlgorithmManager is the manager for the scheduler 
algorithm.
+* @Note: This is the manager for the scheduler algorithm.
+ */
+type SchedulerAlgorithmManager struct {
+       filteredSchedulerAlgorithms  map[string]SchedulerAlgorithm
+       scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm
+       dispatchPaused               bool
+}
+
+/*
+* @Description: Init initializes the SchedulerAlgorithmManager.
+* @Note: This function will initialize the SchedulerAlgorithmManager.
+ */
+// Need to put DependsSchedulerAlgorithm before WaitingSchedulerAlgorithm
+func (am *SchedulerAlgorithmManager) Init() {
+       am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+       am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+       am.dispatchPaused = false
+       // Register filter and schedule algorithms
+       am.RegisterFilterAlgorithm(&DependsSchedulerAlgorithm{})
+       am.RegisterFilterAlgorithm(&WaitingSchedulerAlgorithm{})
+       // Register default SchedulerAlgorithms
+       am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{})
+}
+
+/*
+* @Description: RegisterSchedulerAlgorithm registers the scheduler algorithm.
+* @Note: This function will register the scheduler algorithm.
+* @Param schedulerAlgorithm
+ */
+func (am *SchedulerAlgorithmManager) 
RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) {
+       if schedulerAlgorithm == nil {
+               return
+       }
+       name := schedulerAlgorithm.Name()
+       if _, exists := am.scheduledSchedulerAlgorithms[name]; exists {
+               return // SchedulerAlgorithm already registered
+       }
+
+       // only support one scheduling algorithm for now
+       if len(am.scheduledSchedulerAlgorithms) > 0 {
+               return // Only one scheduling algorithm can be registered
+       }
+       schedulerAlgorithm.Init()
+       am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm
+}
+
+/*
+* @Description: RegisterFilterAlgorithm registers the filter algorithm.
+* @Note: This function will register the filter algorithm.
+* @Param filterAlgorithm
+ */
+func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm 
SchedulerAlgorithm) {
+       if filterAlgorithm == nil {
+               return
+       }
+       name := filterAlgorithm.Name()
+       if _, exists := am.filteredSchedulerAlgorithms[name]; exists {
+               return // SchedulerAlgorithm already registered
+       }
+       filterAlgorithm.Init()
+       am.filteredSchedulerAlgorithms[name] = filterAlgorithm
+}
+
+/*
+* @Description: IsDispatchPaused checks if the dispatch is paused.
+* @Note: This function will check if the dispatch is paused.
+* @Return bool
+ */
+func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool {
+       return am.dispatchPaused
+}
+
+/*
+* @Description: PauseDispatch pauses the dispatch.
+* @Note: This function will pause the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) PauseDispatch() {
+       am.dispatchPaused = true
+}
+
+/*
+* @Description: ResumeDispatch resumes the dispatch.
+* @Note: This function will resume the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) ResumeDispatch() {
+       am.dispatchPaused = false
+}
+
+/*
+* @Description: ScheduleNextTasks schedules the next tasks.
+* @Note: This function will schedule the next tasks.
+* @Param allTasks
+* @Param taskToWorkerGroupMap
+* @Param idleWorkerGroups
+* @Param concurrentWorkerGroups
+* @Param softSchedule
+* @Return []*structure.TaskInfo, error
+ */
+// For all tasks, filter and schedule them
+// Only one scheduling algorithm is supported for now
+func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if am.dispatchPaused {
+               return nil, nil // No tasks to schedule if dispatch is paused
+       }
+
+       filteredTasks := allTasks
+       for _, algorithm := range am.filteredSchedulerAlgorithms {
+               var err error
+               filteredTasks, err = algorithm.FilterNextTasks(filteredTasks, 
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       if len(filteredTasks) == 0 {
+               return nil, nil // No tasks to schedule after filtering
+       }
+
+       // only support one scheduling algorithm for now
+       // get first algorithm
+       for _, algorithm := range am.scheduledSchedulerAlgorithms {
+               tasks, err := algorithm.ScheduleNextTasks(filteredTasks, 
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+               if err != nil {
+                       return nil, err
+               }
+               return tasks, nil // Return the scheduled tasks
+       }
+
+       return nil, nil // No tasks scheduled
+}
+
+type FIFOSchedulerAlgorithm struct{}
+
+func (f *FIFOSchedulerAlgorithm) Name() string {
+       return "FIFO"
+}
+
+func (f *FIFOSchedulerAlgorithm) Init() {
+       // No specific initialization needed for FIFO
+       logrus.Info("Initializing FIFOSchedulerAlgorithm")
+}
+
+func (f *FIFOSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for FIFO
+       return allTasks, nil
+}
+
+func (f *FIFOSchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       // For FIFO, we simply return the available tasks in the order they are 
provided
+       for _, task := range allTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       // only support idle worker groups for now
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
+
+type PrioritySchedulerAlgorithm struct{}
+
+func (p *PrioritySchedulerAlgorithm) Name() string {
+       return "Priority"
+}
+
+func (p *PrioritySchedulerAlgorithm) Init() {
+       // No specific initialization needed for Priority
+       logrus.Info("Initializing PrioritySchedulerAlgorithm")
+}
+
+func (p *PrioritySchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for Priority
+       return allTasks, nil
+}
+
+func (p *PrioritySchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       // Sort tasks by priority (higher priority first)
+       sort.Slice(allTasks, func(i, j int) bool {
+               return allTasks[i].Priority > allTasks[j].Priority
+       })
+
+       for _, task := range allTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       // only support idle worker groups for now
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
+
+type PriorityElderSchedulerAlgorithm struct {
+       ageParam         int64
+       priorityParam    int64
+       resourceParam    int64
+       randomValueParam int64
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Name() string {
+       return "PriorityElder"
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Init() {
+       logrus.Info("Initializing PriorityElderSchedulerAlgorithm")
+
+       // Initialize parameters with default values
+       defaultAgeParam := "1"
+       defaultPriorityParam := "1"
+       defaultResourceParam := "10000000000"
+       defaultRandomValueParam := "1" // Placeholder for any random value logic
+
+       // Load parameters from configuration
+       ageParam := common.GetConfigDefault("priority_elder_age_param", 
defaultAgeParam).(string)
+       priorityParam := 
common.GetConfigDefault("priority_elder_priority_param", 
defaultPriorityParam).(string)
+       resourceParam := 
common.GetConfigDefault("priority_elder_resource_param", 
defaultResourceParam).(string)
+       randomValueParam := 
common.GetConfigDefault("priority_elder_random_value_param", 
defaultRandomValueParam).(string)
+
+       ageParamInt, err := strconv.Atoi(ageParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_age_param to 
int: %v", err)
+               logrus.Infof("using default priority_elder_age_param: %s", 
defaultAgeParam)
+               ageParamInt, _ = strconv.Atoi(defaultAgeParam)
+       }
+       p.ageParam = int64(ageParamInt)
+       priorityParamInt, err := strconv.Atoi(priorityParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_priority_param 
to int: %v", err)
+               logrus.Infof("using default priority_elder_priority_param: %s", 
defaultPriorityParam)
+               priorityParamInt, _ = strconv.Atoi(defaultPriorityParam)
+       }
+       p.priorityParam = int64(priorityParamInt)
+       resourceParamInt, err := strconv.Atoi(resourceParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_resource_param 
to int: %v", err)
+               logrus.Infof("using default priority_elder_resource_param: %s", 
defaultResourceParam)
+               resourceParamInt, _ = strconv.Atoi(defaultResourceParam)
+       }
+       p.resourceParam = int64(resourceParamInt)
+       randomValueParamInt, err := strconv.Atoi(randomValueParam)
+       if err != nil {
+               logrus.Errorf("failed to convert 
priority_elder_random_value_param to int: %v", err)
+               logrus.Infof("using default priority_elder_random_value_param: 
%s", defaultRandomValueParam)
+               randomValueParamInt, _ = strconv.Atoi(defaultRandomValueParam)
+       }
+       p.randomValueParam = int64(randomValueParamInt)
+
+       logrus.Infof("PriorityElderSchedulerAlgorithm initialized with 
parameters: ageParam=%d, priorityParam=%d, resourceParam=%d, 
randomValueParam=%d", p.ageParam, p.priorityParam, p.resourceParam, 
p.randomValueParam)
+}
+
+func (p *PriorityElderSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for PriorityElder
+       return allTasks, nil
+}
+
+func (p *PriorityElderSchedulerAlgorithm) CalculateTaskEmergency(task 
*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, printValue bool) 
int64 {
+       // step 0: get params
+       ageParam := p.ageParam
+       priorityParam := p.priorityParam
+       resourceParam := p.resourceParam
+       randomValueParam := p.randomValueParam
+       // step 1: age
+       ageCost := ageParam * time.Since(task.CreateTime).Milliseconds() / 1000 
// in seconds
+       // step 2: priority
+       priorityCost := priorityParam * int64(task.Priority)
+       // step 3: resource cost
+       graph := structure.GraphManager.GetGraphByName(task.SpaceName, 
task.GraphName)
+       resourceCost := int64(0)
+       if graph == nil {
+               resourceCost = resourceParam // if graph not found, use max 
resource cost
+       } else {
+               resourceCost = resourceParam / max(1, 
graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1

Review Comment:
   The calculation logic for resourceCost is complex and should be extracted 
into a separate method with proper documentation explaining the algorithm.



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +225,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 
-       go s.dispatch()
+       if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+               if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+                       logrus.Errorf("failed to add cron task: %v", err)
+                       return false, err
+               }
+               logrus.Infof("added cron task for task '%d' with expression 
'%s'", taskInfo.ID, taskInfo.CronExpr)
+       }
 
        return ok, nil
 }
 
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
-       if taskInfo == nil {
-               return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is 
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) 
(int32, error) {
+       if template == nil {
+               return -1, errors.New("the argument `template` is nil")
        }
 
-       s.Lock()
-       isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
-       task := s.spaceQueue.RemoveTask(taskInfo.ID)
-       s.Unlock(nil)
+       bc := &baseCreator{}
+       taskInfo, err := bc.CopyTaskInfo(template)
+       if err != nil {
+               logrus.Errorf("failed to copy task info from template, template 
ID: %d, caused by: %v", template.ID, err)
+               return -1, err
+       }
+       bc.saveTaskInfo(taskInfo)
 
-       isInQueue := false
-       if task != nil {
-               logrus.Infof("removed task '%d' from space queue", task.ID)
-               isInQueue = true
+       ok, err := s.QueueTask(taskInfo)
+       if err != nil || !ok {
+               logrus.Errorf("failed to queue task from template, template ID: 
%d, caused by: %v", template.ID, err)
+               return -1, err
        }
 
-       if isInQueue && !isHeadTask {
-               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
-                       return err
-               }
+       logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, 
template.ID)
 
-               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
-       } else {
-               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
-               return s.handleCancelTask(taskInfo)
+       return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, 
[]error) {
+       if len(taskInfos) == 0 {
+               return []bool{}, []error{}
        }
 
-       return nil
-}
+       s.PauseDispatch()
 
-func (s *ScheduleBl) IsDispatchPaused() bool {
-       return s.isDispatchPaused
-}
-func (s *ScheduleBl) PauseDispatch() {
-       s.isDispatchPaused = true
-}
+       defer s.ResumeDispatch()
+       // defer s.Unlock(s.Lock())
 
-func (s *ScheduleBl) ResumeDispatch() {
-       s.isDispatchPaused = false
+       errors := make([]error, len(taskInfos))
+       oks := make([]bool, len(taskInfos))
+
+       for _, taskInfo := range taskInfos {
+               ok, err := s.QueueTask(taskInfo)
+               if err != nil {
+                       logrus.Errorf("failed to queue task '%d': %v", 
taskInfo.ID, err)
+               }
+               errors = append(errors, err)
+               oks = append(oks, ok)
+       }
+
+       return oks, errors
 }
 
-func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
-       return s.spaceQueue.AllTasks()
+// ******** CloseCurrent ********
+func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) 
error {
+       defer s.Unlock(s.Lock())
+
+       // trace tasks need these workers, check if these tasks are available
+       s.taskManager.RemoveTask(taskId)
+       // release the worker group
+       s.resourceManager.ReleaseByTaskID(taskId)
+
+       if len(removeWorkerName) > 0 {
+               // stop the cron job if exists when need remove worker, 
otherwise the task is just closed normally
+               s.cronManager.DeleteTask(taskId)
+               // remove the worker from resource manager
+               workerName := removeWorkerName[0]
+               if workerName == "" {
+                       return errors.New("the argument `removeWorkerName` is 
empty")
+               }
+               logrus.Infof("removing worker '%s' from resource manager", 
workerName)
+               s.ChangeWorkerStatus(workerName, 
schedules.WorkerOngoingStatusDeleted)
+       }
+
+       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
+       s.TryScheduleNextTasks(true)
+       return nil
 }
 
-func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
-       return s.spaceQueue.SpaceTasks(space)
+// This will be called when a worker is offline.
+// This will be called when a worker is online.
+func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status 
schedules.WorkerOngoingStatus) (bool, error) {
+       defer s.Unlock(s.Lock())
+       s.resourceManager.ChangeWorkerStatus(workerName, status)
+
+       logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
+       // After changing the worker status, we may need to reschedule tasks
+       s.TryScheduleNextTasks(true)
+
+       return true, nil
 }
 
-func (s *ScheduleBl) CloseCurrent(taskId int32) error {
-       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
-       s.dispatch()
+// ******** START TASK ********
+func (s *ScheduleBl) waitingStartedTask() {
+       for taskInfo := range s.startChan {
+               if taskInfo == nil {
+                       logrus.Warnf("recieved a nil task from startChan")

Review Comment:
   Spelling error: 'recieved' should be 'received'.
   ```suggestion
                        logrus.Warnf("received a nil task from startChan")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to