imbajin commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444647531


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 
-       go s.dispatch()
+       if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+               if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+                       logrus.Errorf("failed to add cron task: %v", err)
+                       return false, err
+               }
+               logrus.Infof("added cron task for task '%d' with expression 
'%s'", taskInfo.ID, taskInfo.CronExpr)
+       }
 
        return ok, nil
 }
 
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
-       if taskInfo == nil {
-               return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is 
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) 
(int32, error) {
+       if template == nil {
+               return -1, errors.New("the argument `template` is nil")
        }
 
-       s.Lock()
-       isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
-       task := s.spaceQueue.RemoveTask(taskInfo.ID)
-       s.Unlock(nil)
+       bc := &baseCreator{}
+       taskInfo, err := bc.CopyTaskInfo(template)
+       if err != nil {
+               logrus.Errorf("failed to copy task info from template, template 
ID: %d, caused by: %v", template.ID, err)
+               return -1, err
+       }
+       bc.saveTaskInfo(taskInfo)
 
-       isInQueue := false
-       if task != nil {
-               logrus.Infof("removed task '%d' from space queue", task.ID)
-               isInQueue = true
+       ok, err := s.QueueTask(taskInfo)
+       if err != nil || !ok {
+               logrus.Errorf("failed to queue task from template, template ID: 
%d, caused by: %v", template.ID, err)
+               return -1, err
        }
 
-       if isInQueue && !isHeadTask {
-               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
-                       return err
-               }
+       logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, 
template.ID)
 
-               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
-       } else {
-               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
-               return s.handleCancelTask(taskInfo)
+       return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, 
[]error) {
+       if len(taskInfos) == 0 {
+               return []bool{}, []error{}
        }
 
-       return nil
-}
+       s.PauseDispatch()
 
-func (s *ScheduleBl) IsDispatchPaused() bool {
-       return s.isDispatchPaused
-}
-func (s *ScheduleBl) PauseDispatch() {
-       s.isDispatchPaused = true
-}
+       defer s.ResumeDispatch()
+       // defer s.Unlock(s.Lock())
 
-func (s *ScheduleBl) ResumeDispatch() {
-       s.isDispatchPaused = false
+       errors := make([]error, len(taskInfos))
+       oks := make([]bool, len(taskInfos))
+
+       for _, taskInfo := range taskInfos {
+               ok, err := s.QueueTask(taskInfo)
+               if err != nil {
+                       logrus.Errorf("failed to queue task '%d': %v", 
taskInfo.ID, err)
+               }
+               errors = append(errors, err)
+               oks = append(oks, ok)
+       }
+
+       return oks, errors
 }
 
-func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
-       return s.spaceQueue.AllTasks()
+// ******** CloseCurrent ********
+func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) 
error {
+       defer s.Unlock(s.Lock())
+
+       // trace tasks need these workers, check if these tasks are available
+       s.taskManager.RemoveTask(taskId)
+       // release the worker group
+       s.resourceManager.ReleaseByTaskID(taskId)
+
+       if len(removeWorkerName) > 0 {
+               // stop the cron job if exists when need remove worker, 
otherwise the task is just closed normally
+               s.cronManager.DeleteTask(taskId)
+               // remove the worker from resource manager
+               workerName := removeWorkerName[0]
+               if workerName == "" {
+                       return errors.New("the argument `removeWorkerName` is 
empty")
+               }
+               logrus.Infof("removing worker '%s' from resource manager", 
workerName)
+               s.ChangeWorkerStatus(workerName, 
schedules.WorkerOngoingStatusDeleted)
+       }
+
+       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
+       s.TryScheduleNextTasks(true)
+       return nil
 }
 
-func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
-       return s.spaceQueue.SpaceTasks(space)
+// This will be called when a worker is offline.
+// This will be called when a worker is online.
+func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status 
schedules.WorkerOngoingStatus) (bool, error) {
+       defer s.Unlock(s.Lock())
+       s.resourceManager.ChangeWorkerStatus(workerName, status)
+
+       logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
+       // After changing the worker status, we may need to reschedule tasks
+       s.TryScheduleNextTasks(true)
+
+       return true, nil
 }

Review Comment:
   **线程安全问题**:  从 channel 接收任务后直接调用 ,如果  执行时间较长,可能会阻塞其他任务的接收。
   
   **建议**:
   ```go
   func (s *ScheduleBl) waitingStartedTask() {
       for taskInfo := range s.startChan {
           if taskInfo == nil {
               logrus.Warnf("received a nil task from startChan")
               continue
           }
           
           logrus.Infof("chan received task '%d' to start", taskInfo.ID)
           // 使用 goroutine 异步处理,避免阻塞 channel 接收
           go s.handleStartTask(taskInfo)
       }
   }
   ```
   
   这样可以提高并发性能。



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 

Review Comment:
   **资源泄漏风险**: 如果 cron 任务调度成功但后续  失败,cron 任务不会被清理。
   
   **建议**:
   ```go
   if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
       // 先队列任务,成功后再添加 cron
       ok, err := s.taskManager.QueueTask(taskInfo)
       if err != nil {
           taskMgr.SetError(taskInfo, err.Error())
           return ok, err
       }
       
       if err := s.cronManager.AddCronTask(taskInfo); err != nil {
           logrus.Errorf("failed to add cron task: %v", err)
           // 清理已队列的任务
           s.taskManager.RemoveTask(taskInfo.ID)
           return false, err
       }
   }
   ```
   
   或者使用事务性操作来确保原子性。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to