imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444644762
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
+ logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks),
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
+ // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
+ // NOT only by user setting, but also by scheduler setting
Review Comment:
**错误处理不一致**: 当 失败时,错误被记录但函数继续执行。这可能导致部分任务入队成功,部分失败,但调用者无法知道具体哪些失败了。
**建议**:
1. 在遇到第一个错误时考虑是否应该停止并回滚
2. 或者返回更详细的错误信息,包括失败的任务 ID 列表
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]