imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2422715999
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +225,161 @@ func (s *ScheduleBl) QueueTask(taskInfo
*structure.TaskInfo) (bool, error) {
return false, errors.New("the property `SpaceName` of taskInfo
is empty")
}
- //defer s.Unlock(s.Lock())
+ defer s.Unlock(s.Lock())
if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err
!= nil {
return false, err
}
+ logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID,
taskInfo)
+
+ // check dependency if exists
+ if len(taskInfo.Preorders) > 0 {
+ for _, depTaskID := range taskInfo.Preorders {
+ depTask := taskMgr.GetTaskByID(depTaskID)
+ if depTask == nil {
+ err := errors.New("the dependency task with ID
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+ logrus.Error(err)
+ taskMgr.SetError(taskInfo, err.Error())
+ return false, err
+ }
+ }
+ }
+
// Notice: Ensure successful invocation.
- ok, err := s.spaceQueue.PushTask(taskInfo)
+ // make sure all tasks have alloc to a worker group
+ ok, err := s.taskManager.QueueTask(taskInfo)
if err != nil {
taskMgr.SetError(taskInfo, err.Error())
return ok, err
}
- go s.dispatch()
+ if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+ if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+ logrus.Errorf("failed to add cron task: %v", err)
+ return false, err
+ }
+ logrus.Infof("added cron task for task '%d' with expression
'%s'", taskInfo.ID, taskInfo.CronExpr)
+ }
return ok, nil
}
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
- if taskInfo == nil {
- return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo)
(int32, error) {
+ if template == nil {
+ return -1, errors.New("the argument `template` is nil")
}
- s.Lock()
- isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
- task := s.spaceQueue.RemoveTask(taskInfo.ID)
- s.Unlock(nil)
+ bc := &baseCreator{}
+ taskInfo, err := bc.CopyTaskInfo(template)
+ if err != nil {
+ logrus.Errorf("failed to copy task info from template, template
ID: %d, caused by: %v", template.ID, err)
+ return -1, err
+ }
+ bc.saveTaskInfo(taskInfo)
- isInQueue := false
- if task != nil {
- logrus.Infof("removed task '%d' from space queue", task.ID)
- isInQueue = true
+ ok, err := s.QueueTask(taskInfo)
+ if err != nil || !ok {
+ logrus.Errorf("failed to queue task from template, template ID:
%d, caused by: %v", template.ID, err)
+ return -1, err
}
- if isInQueue && !isHeadTask {
- if err := taskMgr.SetState(taskInfo,
structure.TaskStateCanceled); err != nil {
- return err
- }
+ logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID,
template.ID)
- logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
- } else {
- logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
- return s.handleCancelTask(taskInfo)
+ return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool,
[]error) {
+ if len(taskInfos) == 0 {
+ return []bool{}, []error{}
}
- return nil
-}
+ s.PauseDispatch()
Review Comment:
**Silent failures in batch operations**
The batch operation returns arrays of results but there's no clear
indication of which operations failed. This makes error handling difficult for
callers.
**Suggested improvement:**
```go
type BatchQueueResult struct {
TaskID int32
Success bool
Error error
}
func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo)
[]*BatchQueueResult {
if len(taskInfos) == 0 {
return []*BatchQueueResult{}
}
s.PauseDispatch()
defer s.ResumeDispatch()
results := make([]*BatchQueueResult, 0, len(taskInfos))
for _, taskInfo := range taskInfos {
ok, err := s.QueueTask(taskInfo)
results = append(results, &BatchQueueResult{
TaskID: taskInfo.ID,
Success: ok,
Error: err,
})
if err != nil {
logrus.Errorf("failed to queue task '%d': %v", taskInfo.ID, err)
}
}
return results
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]