imbajin commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444659706


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
                logrus.Infof("using default start_chan_size: %s", 
defaultChanSizeConfig)
                chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
        }
-       startChan := make(chan *structure.TaskInfo, chanSizeInt)
-       s.startChan = startChan
-       s.spaceQueue = (&schedules.SpaceQueue{}).Init()
-       s.broker = (&schedules.Broker{}).Init()
+       s.startChanSize = chanSizeInt
 
-       go s.waitingTask()
-       go s.startTicker()
+       // tickerInterval
+       const defaultTickerInterval = "3"
+       tickerInterval := common.GetConfigDefault("ticker_interval", 
defaultTickerInterval).(string)
+       tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+       if err != nil {
+               logrus.Errorf("failed to convert ticker_interval to int: %v", 
err)
+               logrus.Infof("using default ticker_interval: %s", 
defaultTickerInterval)
+               tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+       }
+       s.tickerInterval = tickerIntervalInt
+
+       // softSchedule
+       softSchedule := common.GetConfigDefault("soft_schedule", 
"true").(string)
+       if softSchedule == "true" {
+               s.softSchedule = true
+       } else {
+               s.softSchedule = false
+       }
+
+       logrus.Infof("ScheduleBl configuration: startChanSize=%d, 
tickerInterval=%d, softSchedule=%v",
+               s.startChanSize, s.tickerInterval, s.softSchedule)
 }
 
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
-       return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+       // Create a ticker with the specified interval
+       ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+       for range ticker {
+               logrus.Debug("Ticker ticked")
+               s.TryScheduleNextTasks()
+       }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+       defer func() {
+               if err := recover(); err != nil {
+                       logrus.Errorln("TryScheduleNextTasks() has been 
recovered:", err)
+               }
+       }()
+
+       if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+               logrus.Errorf("do scheduling error:%v", err)
+       }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error 
{
+       // Implement logic to get the next task in the queue for the given space
+       if !(len(noLock) > 0 && noLock[0]) {
+               defer s.Unlock(s.Lock())
+       }
+
+       // step 1: make sure all tasks have alloc to a worker group
+       // This is done by the TaskManager, which assigns a worker group to 
each task
+       s.taskManager.RefreshTaskToWorkerGroupMap()
+
+       // step 2: get available resources and tasks
+       logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+       idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+       concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+       allTasks := s.taskManager.GetAllTasksNotComplete()
+       if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 && 
len(concurrentWorkerGroups) == 0) {
+               logrus.Debugf("no available tasks or workerGroups, allTasks: 
%d, workerGroups: %d/%d",
+                       len(allTasks), len(idleWorkerGroups), 
len(concurrentWorkerGroups))
+               return nil
+       }

Review Comment:
   **严重的逻辑错误**: 使用 append 添加到预分配的 slice 会导致返回错误的结果。
   
   **当前问题**:
   ```go
   errors := make([]error, len(taskInfos))  // 创建 len 个 nil 元素
   oks := make([]bool, len(taskInfos))      // 创建 len 个 false 元素
   // ... 循环中
   errors = append(errors, err)  // 追加到末尾,导致 2*len 个元素
   oks = append(oks, ok)
   ```
   
   **结果**: 返回的 slice 长度是 2*len,前 len 个元素都是零值
   
   **修复方案 1 - 使用容量**:
   ```go
   errors := make([]error, 0, len(taskInfos))
   oks := make([]bool, 0, len(taskInfos))
   ```
   
   **修复方案 2 - 使用索引赋值**:
   ```go
   for i, taskInfo := range taskInfos {
       ok, err := s.QueueTask(taskInfo)
       errors[i] = err
       oks[i] = ok
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to