imbajin commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444660211


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
                logrus.Infof("using default start_chan_size: %s", 
defaultChanSizeConfig)
                chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
        }
-       startChan := make(chan *structure.TaskInfo, chanSizeInt)
-       s.startChan = startChan
-       s.spaceQueue = (&schedules.SpaceQueue{}).Init()
-       s.broker = (&schedules.Broker{}).Init()
+       s.startChanSize = chanSizeInt
 
-       go s.waitingTask()
-       go s.startTicker()
+       // tickerInterval
+       const defaultTickerInterval = "3"
+       tickerInterval := common.GetConfigDefault("ticker_interval", 
defaultTickerInterval).(string)
+       tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+       if err != nil {
+               logrus.Errorf("failed to convert ticker_interval to int: %v", 
err)
+               logrus.Infof("using default ticker_interval: %s", 
defaultTickerInterval)
+               tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+       }
+       s.tickerInterval = tickerIntervalInt
+
+       // softSchedule
+       softSchedule := common.GetConfigDefault("soft_schedule", 
"true").(string)
+       if softSchedule == "true" {
+               s.softSchedule = true
+       } else {
+               s.softSchedule = false
+       }
+
+       logrus.Infof("ScheduleBl configuration: startChanSize=%d, 
tickerInterval=%d, softSchedule=%v",
+               s.startChanSize, s.tickerInterval, s.softSchedule)
 }
 
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
-       return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+       // Create a ticker with the specified interval
+       ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+       for range ticker {
+               logrus.Debug("Ticker ticked")
+               s.TryScheduleNextTasks()
+       }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+       defer func() {
+               if err := recover(); err != nil {
+                       logrus.Errorln("TryScheduleNextTasks() has been 
recovered:", err)
+               }
+       }()
+
+       if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+               logrus.Errorf("do scheduling error:%v", err)
+       }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error 
{
+       // Implement logic to get the next task in the queue for the given space
+       if !(len(noLock) > 0 && noLock[0]) {
+               defer s.Unlock(s.Lock())
+       }
+
+       // step 1: make sure all tasks have alloc to a worker group
+       // This is done by the TaskManager, which assigns a worker group to 
each task
+       s.taskManager.RefreshTaskToWorkerGroupMap()
+
+       // step 2: get available resources and tasks
+       logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+       idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+       concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+       allTasks := s.taskManager.GetAllTasksNotComplete()
+       if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 && 
len(concurrentWorkerGroups) == 0) {
+               logrus.Debugf("no available tasks or workerGroups, allTasks: 
%d, workerGroups: %d/%d",
+                       len(allTasks), len(idleWorkerGroups), 
len(concurrentWorkerGroups))
+               return nil
+       }
+       logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), 
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
+       // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
+       // NOT only by user setting, but also by scheduler setting

Review Comment:
   **错误处理不一致**: 当 QueueTask 失败时,错误被记录但函数继续执行。这可能导致部分任务入队成功,部分失败,但调用者无法知道具体哪些失败了。
   
   **建议**:
   1. 在遇到第一个错误时考虑是否应该停止并回滚已入队的任务
   2. 或者至少在返回值中明确标识哪些任务失败了
   3. 使用事务性的批量操作来保证原子性
   
   **示例改进**:
   ```go
   // 记录成功和失败的任务
   successTasks := make([]*structure.TaskInfo, 0, len(taskInfos))
   for _, taskInfo := range taskInfos {
       ok, err := s.QueueTask(taskInfo)
       if err != nil {
           // 回滚已成功的任务
           for _, t := range successTasks {
               s.taskManager.RemoveTask(t.ID)
           }
           return false, fmt.Errorf("failed to queue task %d: %w", taskInfo.ID, 
err)
       }
       successTasks = append(successTasks, taskInfo)
   }
   ```



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)

Review Comment:
   **验证不足**: 依赖任务的验证只检查了任务是否存在,但没有检查:
   1. 依赖任务的状态(是否已完成、是否失败等)
   2. 是否存在循环依赖
   3. 依赖链的深度是否合理
   
   **建议**:
   ```go
   for _, depTaskID := range taskInfo.Preorders {
       depTask := taskMgr.GetTaskByID(depTaskID)
       if depTask == nil {
           return false, fmt.Errorf("dependency task %d does not exist", 
depTaskID)
       }
   
       // 检查依赖任务状态
       if depTask.State == structure.TaskStateFailed {
           return false, fmt.Errorf("dependency task %d has failed", depTaskID)
       }
   
       // 检查循环依赖
       if hasCircularDependency(taskInfo.ID, depTaskID) {
           return false, errors.New("circular dependency detected")
       }
   
       // 检查 space 一致性
       if depTask.SpaceName != taskInfo.SpaceName {
           return false, errors.New("dependency task must be in same space")
       }
   }
   ```



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
                logrus.Infof("using default start_chan_size: %s", 
defaultChanSizeConfig)
                chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
        }
-       startChan := make(chan *structure.TaskInfo, chanSizeInt)
-       s.startChan = startChan
-       s.spaceQueue = (&schedules.SpaceQueue{}).Init()
-       s.broker = (&schedules.Broker{}).Init()
+       s.startChanSize = chanSizeInt
 

Review Comment:
   **配置值硬编码**: 这里使用字符串字面量 "3" 作为默认值,建议定义为常量。
   
   **建议**:
   ```go
   const (
       defaultStartChanSize = "10"
       defaultMaxRunningTasks = "3"
       defaultCheckWaitingInterval = "5"
   )
   
   // 然后在使用时
   maxRunningTasks := options.GetInt(s.Name+".max_running_tasks", 
defaultMaxRunningTasks)
   ```
   
   这样可以:
   1. 提高代码可维护性
   2. 避免魔法数字
   3. 便于统一修改默认值



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to