imbajin commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2422717534


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -222,68 +438,115 @@ func (s *ScheduleBl) startWaitingTask(agent 
*schedules.Agent, taskInfo *structur
 
        taskInfo.StartTime = time.Now()
        err = taskStarter.StartTask()
+
+       // only for test or debug, record the task start sequence
+       if err := s.taskManager.AddTaskStartSequence(taskInfo.ID); err != nil {
+               logrus.Errorf("failed to add task '%d' to start sequence: %v", 
taskInfo.ID, err)
+       }
+
        if err != nil {
                logrus.Errorf("failed to start a task, type: %s, taskID: %d, 
caused by: %v", taskInfo.Type, taskInfo.ID, err)
                taskMgr.SetError(taskInfo, err.Error())
        }
-
 }
 
-func (s *ScheduleBl) dispatch() {
-       defer func() {
-               if err := recover(); err != nil {
-                       logrus.Errorln("dispatch() has been recovered:", err)
+// ********* CANCEL TASK ********
+// handle cancel task
+// need to cancel cron task
+func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
+       if taskInfo == nil {
+               return errors.New("the argument `taskInfo` is nil")
+       }
+
+       defer s.Unlock(s.Lock())
+
+       isHeadTask := s.taskManager.IsTaskOngoing(taskInfo.ID)
+       task := s.taskManager.RemoveTask(taskInfo.ID)
+       s.cronManager.DeleteTask(taskInfo.ID)
+       // err := s.taskManager.CancelTask(taskInfo)
+       isInQueue := false
+       if task != nil {
+               logrus.Infof("removed task '%d' from space queue", taskInfo.ID)
+               isInQueue = true
+       }
+
+       if isInQueue && !isHeadTask {
+               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
+                       return err
                }
-       }()
 
-       if err := s.doDispatch(); err != nil {
-               logrus.Errorf("do dispatching error:%v", err)
+               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
+       } else {
+               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
+               return s.handleCancelTask(taskInfo)
        }
+
+       return nil
 }
 
-func (s *ScheduleBl) doDispatch() error {
-       if s.isDispatchPaused {
-               logrus.Warn("the dispatching was paused")
-               return nil
+func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
+       logrus.Infof("received task '%d' to cancel", taskInfo.ID)
+       canceler, err := NewTaskCanceler(taskInfo)
+       if err != nil {
+               logrus.Errorf("failed to create new TaskCanceler err: %v", err)
+               taskMgr.SetError(taskInfo, err.Error())
+               return err
        }
 
-       defer s.dispatchLocker.Unlock(s.dispatchLocker.Lock())
-
-       buffer := s.spaceQueue.HeadTasks()
-       if len(buffer) == 0 {
-               return nil
+       if err := canceler.CancelTask(); err != nil {
+               logrus.Errorf("failed to cancel task '%d', caused by: %v", 
taskInfo.ID, err)
+               taskMgr.SetError(taskInfo, err.Error())
+               return err
        }
 
-       for _, task := range buffer {
-               select {
-               case s.startChan <- task:
-               default:
-                       logrus.Warnf("the start channel is full, dropped task: 
%d", task.ID)
-               }
+       // set worker state to idle or concurrent running
+       s.resourceManager.ReleaseByTaskID(taskInfo.ID)
 
+       return nil
+}
+
+func (s *ScheduleBl) CancelCronTask(taskInfo *structure.TaskInfo) error {
+       if taskInfo == nil {
+               return errors.New("the argument `taskInfo` is nil")
        }
 
+       s.cronManager.DeleteTask(taskInfo.ID)
+
        return nil
 }
 
-func (s *ScheduleBl) waitingTask() {
-       for taskInfo := range s.startChan {
-               if taskInfo == nil {
-                       logrus.Warnf("recieved a nil task from startChan")
-                       return
-               }
+// ** Other Methods **
 
-               logrus.Infof("chan received task '%d' to start", taskInfo.ID)
-               s.handleStartTask(taskInfo)
-       }
+func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
+       return s.taskManager.GetLastTask(space)
 }
 
-func (s *ScheduleBl) startTicker() {
-       // Create a ticker that triggers every 3 seconds
-       ticker := time.Tick(3 * time.Second)
+func (s *ScheduleBl) IsDispatchPaused() bool {
+       // Implement logic to check if dispatching is paused
+       return s.algorithmManager.IsDispatchPaused()
+}
 
-       for range ticker {
-               //logrus.Debug("Ticker ticked")
-               s.dispatch()
-       }
+func (s *ScheduleBl) PauseDispatch() {
+       // Implement logic to pause dispatching
+       s.algorithmManager.PauseDispatch()
+}
+
+func (s *ScheduleBl) ResumeDispatch() {
+       // Implement logic to resume dispatching
+       s.algorithmManager.ResumeDispatch()
+}
+
+func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
+       // Implement logic to get all tasks in the queue
+       return s.taskManager.GetAllTasks()
+}
+
+func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
+       // Implement logic to get tasks in the queue for a specific space
+       return s.taskManager.GetTasksInQueue(space)
+}

Review Comment:
   **Debugging code left in production**
   
   The `TaskStartSequence` functionality appears to be for debugging/testing 
only but is included in the production code without proper conditional 
compilation or feature flags.
   
   **Suggested improvements:**
   1. Move to a separate debug build with build tags:
   ```go
   // +build debug
   
   func (s *ScheduleBl) TaskStartSequence(queryTasks []int32) 
[]*structure.TaskInfo {
       return s.taskManager.GetTaskStartSequence(queryTasks)
   }
   ```
   
   2. Or add a runtime flag check:
   ```go
   func (s *ScheduleBl) TaskStartSequence(queryTasks []int32) 
[]*structure.TaskInfo {
       if !common.GetConfigDefault("enable_debug_features", "false").(bool) {
           logrus.Warn("TaskStartSequence called but debug features are 
disabled")
           return nil
       }
       return s.taskManager.GetTaskStartSequence(queryTasks)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to