imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444649666
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -202,6 +420,7 @@ func (s *ScheduleBl) startWaitingTask(agent
*schedules.Agent, taskInfo *structur
}
}()
+ // TODO: Is here need a lock? TOCTTOU
Review Comment:
[38;5;15m**潜在的并发安全问题**: 在第 423 行检查任务状态后到实际启动任务之间存在时间窗口,这是经典的 TOCTTOU
(Time-of-check to Time-of-use) 问题。[39m
[38;5;15m**问题**:[39m
[38;5;15m- 在检查 taskInfo.State 和调用 taskStarter.StartTask() 之间,任务状态可能被其他
goroutine 修改[39m
[38;5;15m- 这可能导致任务被重复启动或状态不一致[39m
[38;5;15m**建议**:[39m
[38;5;15m1. 在 taskStarter.StartTask() 内部进行原子性的状态检查和转换[39m
[38;5;15m2. 使用互斥锁保护整个检查和启动过程[39m
[38;5;15m3. 使用 atomic.CompareAndSwap 来原子地检查和更新状态[39m
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
Review Comment:
[38;5;15m**死锁风险**: BatchQueueTask 调用 PauseDispatch 后在 defer 中调用
ResumeDispatch,但被注释掉的锁代码表明这里可能存在锁的设计问题。[39m
[38;5;15m**问题**:[39m
[38;5;15m1. 注释掉的 defer s.Unlock(s.Lock()) 需要解释原因[39m
[38;5;15m2. 如果在 PauseDispatch 和 ResumeDispatch 之间发生 panic,可能导致调度器永久暂停[39m
[38;5;15m3. PauseDispatch 和 ResumeDispatch 的实现可能不是线程安全的[39m
[38;5;15m**建议**:[39m
[38;5;15m1. 添加注释说明为什么这行锁被注释掉[39m
[38;5;15m2. 确保 PauseDispatch 和 ResumeDispatch 是线程安全的[39m
[38;5;15m3. 使用更明确的锁管理策略[39m
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
+ logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks),
len(idleWorkerGroups), len(concurrentWorkerGroups))
Review Comment:
[38;5;15m**严重的逻辑错误**: 使用 append 添加到预分配的 slice 会导致返回错误的结果。[39m
[38;5;15m**当前问题**:[39m
[38;5;15m```go[39m
[38;5;15merrors := make([]error, len(taskInfos)) // 创建 len 个 nil 元素[39m
[38;5;15moks := make([]bool, len(taskInfos)) // 创建 len 个 false 元素[39m
[38;5;15m// ... 循环中[39m
[38;5;15merrors = append(errors, err) // 追加到末尾,导致 2*len 个元素[39m
[38;5;15moks = append(oks, ok)[39m
[38;5;15m```[39m
[38;5;15m**结果**: 返回的 slice 长度是 2*len,前 len 个元素都是零值[39m
[38;5;15m**修复方案 1 - 使用容量**:[39m
[38;5;15m```go[39m
[38;5;15merrors := make([]error, 0, len(taskInfos))[39m
[38;5;15moks := make([]bool, 0, len(taskInfos))[39m
[38;5;15m```[39m
[38;5;15m**修复方案 2 - 使用索引赋值**:[39m
[38;5;15m```go[39m
[38;5;15mfor i, taskInfo := range taskInfos {[39m
[38;5;15m ok, err := s.QueueTask(taskInfo)[39m
[38;5;15m errors[i] = err[39m
[38;5;15m oks[i] = ok[39m
[38;5;15m}[39m
[38;5;15m```[39m
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
Review Comment:
[38;5;15m**Channel 满时的任务丢失风险**: 当 startChan 满时使用 select default
直接失败,可能导致任务丢失。[39m
[38;5;15m**问题**:[39m
[38;5;15m1. 如果 channel 满了,任务会被标记为错误但没有重试机制[39m
[38;5;15m2. 这可能导致高优先级任务因为 channel 满而无法执行[39m
[38;5;15m3. 没有背压机制来限制任务提交速率[39m
[38;5;15m**建议**:[39m
[38;5;15m1. 使用阻塞发送而不是 select default,或者实现重试机制[39m
[38;5;15m2. 将任务放回队列而不是直接失败[39m
[38;5;15m3. 添加监控指标跟踪 channel 满的频率[39m
[38;5;15m4. 考虑使用有界队列配合背压机制[39m
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo
*structure.TaskInfo) (bool, error) {
return false, errors.New("the property `SpaceName` of taskInfo
is empty")
}
- //defer s.Unlock(s.Lock())
+ defer s.Unlock(s.Lock())
if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err
!= nil {
return false, err
}
+ logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID,
taskInfo)
+
+ // check dependency if exists
+ if len(taskInfo.Preorders) > 0 {
+ for _, depTaskID := range taskInfo.Preorders {
+ depTask := taskMgr.GetTaskByID(depTaskID)
Review Comment:
[38;5;15m**依赖验证不充分**: 只检查依赖任务是否存在,没有验证:[39m
[38;5;15m**缺失的检查**:[39m
[38;5;15m1. 依赖任务的状态(是否完成、失败、取消)[39m
[38;5;15m2. 循环依赖检测[39m
[38;5;15m3. 依赖链深度限制[39m
[38;5;15m4. 依赖任务是否属于同一个 space[39m
[38;5;15m**建议**:[39m
[38;5;15m```go[39m
[38;5;15mfor _, depTaskID := range taskInfo.Preorders {[39m
[38;5;15m depTask := taskMgr.GetTaskByID(depTaskID)[39m
[38;5;15m if depTask == nil {[39m
[38;5;15m return false, fmt.Errorf("dependency task %d does not
exist", depTaskID)[39m
[38;5;15m }[39m
[38;5;15m // 检查依赖任务状态[39m
[38;5;15m if depTask.State == structure.TaskStateFailed {[39m
[38;5;15m return false, fmt.Errorf("dependency task %d has failed",
depTaskID)[39m
[38;5;15m }[39m
[38;5;15m // 检查循环依赖[39m
[38;5;15m if hasCircularDependency(taskInfo.ID, depTaskID) {[39m
[38;5;15m return false, errors.New("circular dependency
detected")[39m
[38;5;15m }[39m
[38;5;15m // 检查 space 一致性[39m
[38;5;15m if depTask.SpaceName != taskInfo.SpaceName {[39m
[38;5;15m return false, errors.New("dependency task must be in same
space")[39m
[38;5;15m }[39m
[38;5;15m}[39m
[38;5;15m```[39m
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo
*structure.TaskInfo) (bool, error) {
return false, errors.New("the property `SpaceName` of taskInfo
is empty")
}
- //defer s.Unlock(s.Lock())
+ defer s.Unlock(s.Lock())
if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err
!= nil {
return false, err
}
+ logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID,
taskInfo)
+
+ // check dependency if exists
+ if len(taskInfo.Preorders) > 0 {
+ for _, depTaskID := range taskInfo.Preorders {
+ depTask := taskMgr.GetTaskByID(depTaskID)
+ if depTask == nil {
+ err := errors.New("the dependency task with ID
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+ logrus.Error(err)
+ taskMgr.SetError(taskInfo, err.Error())
+ return false, err
+ }
+ }
+ }
+
// Notice: Ensure successful invocation.
- ok, err := s.spaceQueue.PushTask(taskInfo)
+ // make sure all tasks have alloc to a worker group
+ ok, err := s.taskManager.QueueTask(taskInfo)
if err != nil {
taskMgr.SetError(taskInfo, err.Error())
return ok, err
}
Review Comment:
[38;5;15m**资源泄漏**: Cron 任务添加成功但后续 QueueTask 失败时,cron 任务不会被清理。[39m
[38;5;15m**问题场景**:[39m
[38;5;15m1. CheckCronExpression 返回 nil (表达式有效)[39m
[38;5;15m2. AddCronTask 成功,cron 任务开始调度[39m
[38;5;15m3. 后续代码失败,函数返回错误[39m
[38;5;15m4. Cron 任务继续运行,但原始任务状态是失败的[39m
[38;5;15m**建议 - 调整顺序**:[39m
[38;5;15m```go[39m
[38;5;15m// 先完成任务队列操作[39m
[38;5;15mok, err := s.taskManager.QueueTask(taskInfo)[39m
[38;5;15mif err != nil {[39m
[38;5;15m taskMgr.SetError(taskInfo, err.Error())[39m
[38;5;15m return ok, err[39m
[38;5;15m}[39m
[38;5;15m// 任务队列成功后再添加 cron[39m
[38;5;15mif s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil
{[39m
[38;5;15m if err := s.cronManager.AddCronTask(taskInfo); err != nil
{[39m
[38;5;15m logrus.Errorf("failed to add cron task: %v", err)[39m
[38;5;15m // 回滚:移除已队列的任务[39m
[38;5;15m s.taskManager.RemoveTask(taskInfo.ID)[39m
[38;5;15m return false, err[39m
[38;5;15m }[39m
[38;5;15m}[39m
[38;5;15m```[39m
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]