imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2422716580
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +89,133 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
+ logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks),
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
Review Comment:
**Commented TODO needs resolution**
This TODO indicates an important design decision that's unresolved. The
scheduler needs to determine if tasks can run concurrently based on both user
settings AND scheduler policies.
**Action required:**
1. Either implement the logic to check concurrent execution capability
2. Create a tracking issue for this work and reference it in the comment
3. Document the temporary behavior and its limitations
Example:
```go
// TODO(issue #XXX): Implement comprehensive concurrent execution check
// Currently only checks user settings. Need to add:
// 1. Resource availability check
// 2. Conflict detection with running tasks
// 3. Worker group capacity limits
// Temporary behavior: Tasks marked as non-exclusive may still be blocked
// if resources are insufficient
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]