Ethereal-O commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2447144298


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 
-       go s.dispatch()
+       if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+               if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+                       logrus.Errorf("failed to add cron task: %v", err)
+                       return false, err
+               }
+               logrus.Infof("added cron task for task '%d' with expression 
'%s'", taskInfo.ID, taskInfo.CronExpr)
+       }
 
        return ok, nil
 }
 
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
-       if taskInfo == nil {
-               return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is 
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) 
(int32, error) {
+       if template == nil {
+               return -1, errors.New("the argument `template` is nil")
        }
 
-       s.Lock()
-       isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
-       task := s.spaceQueue.RemoveTask(taskInfo.ID)
-       s.Unlock(nil)
+       bc := &baseCreator{}
+       taskInfo, err := bc.CopyTaskInfo(template)
+       if err != nil {
+               logrus.Errorf("failed to copy task info from template, template 
ID: %d, caused by: %v", template.ID, err)
+               return -1, err
+       }
+       bc.saveTaskInfo(taskInfo)
 
-       isInQueue := false
-       if task != nil {
-               logrus.Infof("removed task '%d' from space queue", task.ID)
-               isInQueue = true
+       ok, err := s.QueueTask(taskInfo)
+       if err != nil || !ok {
+               logrus.Errorf("failed to queue task from template, template ID: 
%d, caused by: %v", template.ID, err)
+               return -1, err
        }
 
-       if isInQueue && !isHeadTask {
-               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
-                       return err
-               }
+       logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, 
template.ID)
 
-               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
-       } else {
-               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
-               return s.handleCancelTask(taskInfo)
+       return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, 
[]error) {
+       if len(taskInfos) == 0 {
+               return []bool{}, []error{}
        }
 
-       return nil
-}
+       s.PauseDispatch()
 
-func (s *ScheduleBl) IsDispatchPaused() bool {
-       return s.isDispatchPaused
-}
-func (s *ScheduleBl) PauseDispatch() {
-       s.isDispatchPaused = true
-}
+       defer s.ResumeDispatch()
+       // defer s.Unlock(s.Lock())
 
-func (s *ScheduleBl) ResumeDispatch() {
-       s.isDispatchPaused = false
+       errors := make([]error, len(taskInfos))
+       oks := make([]bool, len(taskInfos))
+
+       for _, taskInfo := range taskInfos {
+               ok, err := s.QueueTask(taskInfo)
+               if err != nil {
+                       logrus.Errorf("failed to queue task '%d': %v", 
taskInfo.ID, err)
+               }
+               errors = append(errors, err)
+               oks = append(oks, ok)
+       }
+
+       return oks, errors
 }
 
-func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
-       return s.spaceQueue.AllTasks()
+// ******** CloseCurrent ********
+func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) 
error {
+       defer s.Unlock(s.Lock())
+
+       // trace tasks need these workers, check if these tasks are available
+       s.taskManager.RemoveTask(taskId)
+       // release the worker group
+       s.resourceManager.ReleaseByTaskID(taskId)
+
+       if len(removeWorkerName) > 0 {
+               // stop the cron job if exists when need remove worker, 
otherwise the task is just closed normally
+               s.cronManager.DeleteTask(taskId)
+               // remove the worker from resource manager
+               workerName := removeWorkerName[0]
+               if workerName == "" {
+                       return errors.New("the argument `removeWorkerName` is 
empty")
+               }
+               logrus.Infof("removing worker '%s' from resource manager", 
workerName)
+               s.ChangeWorkerStatus(workerName, 
schedules.WorkerOngoingStatusDeleted)
+       }
+
+       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
+       s.TryScheduleNextTasks(true)
+       return nil
 }
 
-func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
-       return s.spaceQueue.SpaceTasks(space)
+// This will be called when a worker is offline.
+// This will be called when a worker is online.
+func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status 
schedules.WorkerOngoingStatus) (bool, error) {
+       defer s.Unlock(s.Lock())
+       s.resourceManager.ChangeWorkerStatus(workerName, status)
+
+       logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
+       // After changing the worker status, we may need to reschedule tasks
+       s.TryScheduleNextTasks(true)
+
+       return true, nil
 }

Review Comment:
   handleStartTask only checks and calls s.startWaitingTask(agent, taskInfo) 
using a goroutine. So there is no problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to