imbajin commented on code in PR #336:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2444649992


##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 

Review Comment:
   **资源泄漏**: Cron 任务添加成功但后续 QueueTask 失败时,cron 任务不会被清理。
   
   **问题场景**:
   1. CheckCronExpression 返回 nil (表达式有效)
   2. AddCronTask 成功,cron 任务开始调度
   3. 后续代码失败,函数返回错误
   4. Cron 任务继续运行,但原始任务状态是失败的
   
   **建议 - 调整顺序**:
   ```go
   // 先完成任务队列操作
   ok, err := s.taskManager.QueueTask(taskInfo)
   if err != nil {
       taskMgr.SetError(taskInfo, err.Error())
       return ok, err
   }
   
   // 任务队列成功后再添加 cron
   if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil 
{
       if err := s.cronManager.AddCronTask(taskInfo); err != nil 
{
           logrus.Errorf("failed to add cron task: %v", err)
           // 回滚:移除已队列的任务
           s.taskManager.RemoveTask(taskInfo.ID)
           return false, err
       }
   }
   ```
   



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)

Review Comment:
   **依赖验证不充分**: 只检查依赖任务是否存在,没有验证:
   
   **缺失的检查**:
   1. 依赖任务的状态(是否完成、失败、取消)
   2. 循环依赖检测
   3. 依赖链深度限制
   4. 依赖任务是否属于同一个 space
   
   **建议**:
   ```go
   for _, depTaskID := range taskInfo.Preorders {
       depTask := taskMgr.GetTaskByID(depTaskID)
       if depTask == nil {
           return false, fmt.Errorf("dependency task %d does not 
exist", depTaskID)
       }
   
       // 检查依赖任务状态
       if depTask.State == structure.TaskStateFailed {
           return false, fmt.Errorf("dependency task %d has failed", 
depTaskID)
       }
   
       // 检查循环依赖
       if hasCircularDependency(taskInfo.ID, depTaskID) {
           return false, errors.New("circular dependency 
detected")
       }
   
       // 检查 space 一致性
       if depTask.SpaceName != taskInfo.SpaceName {
           return false, errors.New("dependency task must be in same 
space")
       }
   }
   ```
   



##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
                logrus.Infof("using default start_chan_size: %s", 
defaultChanSizeConfig)
                chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
        }
-       startChan := make(chan *structure.TaskInfo, chanSizeInt)
-       s.startChan = startChan
-       s.spaceQueue = (&schedules.SpaceQueue{}).Init()
-       s.broker = (&schedules.Broker{}).Init()
+       s.startChanSize = chanSizeInt
 
-       go s.waitingTask()
-       go s.startTicker()
+       // tickerInterval
+       const defaultTickerInterval = "3"
+       tickerInterval := common.GetConfigDefault("ticker_interval", 
defaultTickerInterval).(string)
+       tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+       if err != nil {
+               logrus.Errorf("failed to convert ticker_interval to int: %v", 
err)
+               logrus.Infof("using default ticker_interval: %s", 
defaultTickerInterval)
+               tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+       }
+       s.tickerInterval = tickerIntervalInt
+
+       // softSchedule
+       softSchedule := common.GetConfigDefault("soft_schedule", 
"true").(string)
+       if softSchedule == "true" {
+               s.softSchedule = true
+       } else {
+               s.softSchedule = false
+       }
+
+       logrus.Infof("ScheduleBl configuration: startChanSize=%d, 
tickerInterval=%d, softSchedule=%v",
+               s.startChanSize, s.tickerInterval, s.softSchedule)
 }
 
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
-       return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+       // Create a ticker with the specified interval
+       ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+       for range ticker {
+               logrus.Debug("Ticker ticked")
+               s.TryScheduleNextTasks()
+       }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+       defer func() {
+               if err := recover(); err != nil {

Review Comment:
   **Channel 满时的任务丢失风险**: 当 startChan 满时使用 select default 
直接失败,可能导致任务丢失。
   
   **问题**:
   1. 如果 channel 满了,任务会被标记为错误但没有重试机制
   2. 这可能导致高优先级任务因为 channel 满而无法执行
   3. 没有背压机制来限制任务提交速率
   
   **建议**:
   1. 使用阻塞发送而不是 select default,或者实现重试机制
   2. 将任务放回队列而不是直接失败
   3. 添加监控指标跟踪 channel 满的频率
   4. 考虑使用有界队列配合背压机制
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to