imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2422651658
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +89,133 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
+ logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks),
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
+ // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
+ // NOT only by user setting, but also by scheduler setting
+
+ // step 3: return the task with the highest priority or small tasks
which can be executed immediately
+ taskToWorkerGroupMap := s.taskManager.GetTaskToWorkerGroupMap()
+ nextTasks, err := s.algorithmManager.ScheduleNextTasks(allTasks,
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+ if err != nil {
+ logrus.Errorf("failed to schedule next tasks: %v", err)
+ return err
+ }
+ logrus.Debugf("scheduled %d tasks", len(nextTasks))
+ // step 4: send to start channel
+ for _, task := range nextTasks {
+ if task == nil {
+ logrus.Warnf("received a nil task from algorithm
manager")
+ continue
Review Comment:
**Missing error handling for channel operations**
If the start channel is full, the task is dropped with only a warning. This
could lead to tasks never being executed without clear notification to the user.
**Suggested fix:**
```go
for _, task := range nextTasks {
if task == nil {
logrus.Warnf("received a nil task from algorithm manager")
continue
}
if task.State != structure.TaskStateWaiting {
logrus.Warnf("task '%d' is not in waiting state, current state: %s",
task.ID, task.State)
continue
}
logrus.Infof("scheduling task '%d' with type '%s' to start channel",
task.ID, task.Type)
select {
case s.startChan <- task:
logrus.Infof("task '%d' sent to start channel", task.ID)
default:
// Critical: Set task to error state instead of silently dropping
errMsg := fmt.Sprintf("start channel is full, cannot schedule task
%d", task.ID)
logrus.Errorf(errMsg)
taskMgr.SetError(task, errMsg)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]