imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444643895
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -222,68 +441,115 @@ func (s *ScheduleBl) startWaitingTask(agent
*schedules.Agent, taskInfo *structur
taskInfo.StartTime = time.Now()
err = taskStarter.StartTask()
+
+ // only for test or debug, record the task start sequence
Review Comment:
**缺少错误处理**: 的错误只是被记录,但没有适当的处理。如果这是关键功能,应该考虑是否需要回滚或重试。
**建议**:
- 如果 是仅用于调试/测试,应该在注释中明确说明失败不会影响正常功能
- 考虑在测试/调试模式下才调用此方法
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
Review Comment:
**逻辑错误**: 这里使用 添加到预分配的 slice,会导致前面的 创建的空元素被保留。
**当前代码问题**:
```go
errors := make([]error, len(taskInfos)) // 创建了 len 个 nil 元素
oks := make([]bool, len(taskInfos)) // 创建了 len 个 false 元素
// ... 循环中
errors = append(errors, err) // 会追加到末尾,导致返回 2*len 个元素
oks = append(oks, ok)
```
**修复方案**:
```go
errors := make([]error, 0, len(taskInfos)) // 使用容量而不是长度
oks := make([]bool, 0, len(taskInfos))
```
或者使用索引赋值:
```go
for i, taskInfo := range taskInfos {
ok, err := s.QueueTask(taskInfo)
errors[i] = err
oks[i] = ok
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]