imbajin commented on code in PR #336:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/336#discussion_r2422716174
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -28,16 +28,58 @@ import (
"github.com/sirupsen/logrus"
)
+/*
+* @Description: ScheduleBl is the scheduler business logic.
+* @Note: This is the main scheduler business logic.
+ */
type ScheduleBl struct {
structure.MutexLocker
- dispatchLocker structure.MutexLocker
- spaceQueue *schedules.SpaceQueue
- broker *schedules.Broker
- startChan chan *structure.TaskInfo
- isDispatchPaused bool
+ // resource management
+ resourceManager *schedules.SchedulerResourceManager
+ // algorithm management
+ algorithmManager *schedules.SchedulerAlgorithmManager
+ // task management
+ taskManager *schedules.SchedulerTaskManager
+ // cron management
+ cronManager *schedules.SchedulerCronManager
+ // start channel for tasks to be started
+ startChan chan *structure.TaskInfo
+ // configurations
+ startChanSize int
+ tickerInterval int
+ softSchedule bool
}
+/*
+* @Description: Init initializes the ScheduleBl.
+* @Note: This function will initialize the ScheduleBl.
+ */
func (s *ScheduleBl) Init() {
+ logrus.Info("Initializing ScheduleBl...")
+ s.LoadConfig()
+ startChan := make(chan *structure.TaskInfo, s.startChanSize)
+ s.startChan = startChan
+
+ s.resourceManager = &schedules.SchedulerResourceManager{}
+ s.resourceManager.Init()
+ s.taskManager = &schedules.SchedulerTaskManager{}
+ s.taskManager.Init()
+ s.algorithmManager = &schedules.SchedulerAlgorithmManager{}
Review Comment:
**Resource leak: goroutines started without context**
The goroutines started in `Init()` run indefinitely without any shutdown
mechanism. This prevents graceful shutdown and testing.
**Suggested improvements:**
```go
type ScheduleBl struct {
// ... existing fields
ctx context.Context
cancel context.CancelFunc
}
func (s *ScheduleBl) Init() {
logrus.Info("Initializing ScheduleBl...")
s.LoadConfig()
s.ctx, s.cancel = context.WithCancel(context.Background())
startChan := make(chan *structure.TaskInfo, s.startChanSize)
s.startChan = startChan
// ... rest of initialization
go s.startTicker(s.ctx)
go s.waitingStartedTask(s.ctx)
}
func (s *ScheduleBl) Shutdown() {
s.cancel()
close(s.startChan)
}
func (s *ScheduleBl) startTicker(ctx context.Context) {
ticker := time.NewTicker(time.Duration(s.tickerInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
logrus.Debug("Ticker ticked")
s.TryScheduleNextTasks()
}
}
}
```
##########
vermeer/apps/master/bl/scheduler_bl.go:
##########
@@ -28,16 +28,58 @@ import (
"github.com/sirupsen/logrus"
)
+/*
+* @Description: ScheduleBl is the scheduler business logic.
+* @Note: This is the main scheduler business logic.
+ */
type ScheduleBl struct {
structure.MutexLocker
- dispatchLocker structure.MutexLocker
- spaceQueue *schedules.SpaceQueue
- broker *schedules.Broker
- startChan chan *structure.TaskInfo
- isDispatchPaused bool
+ // resource management
+ resourceManager *schedules.SchedulerResourceManager
+ // algorithm management
+ algorithmManager *schedules.SchedulerAlgorithmManager
+ // task management
+ taskManager *schedules.SchedulerTaskManager
+ // cron management
+ cronManager *schedules.SchedulerCronManager
+ // start channel for tasks to be started
+ startChan chan *structure.TaskInfo
+ // configurations
+ startChanSize int
+ tickerInterval int
+ softSchedule bool
}
+/*
+* @Description: Init initializes the ScheduleBl.
+* @Note: This function will initialize the ScheduleBl.
+ */
func (s *ScheduleBl) Init() {
+ logrus.Info("Initializing ScheduleBl...")
+ s.LoadConfig()
+ startChan := make(chan *structure.TaskInfo, s.startChanSize)
+ s.startChan = startChan
+
+ s.resourceManager = &schedules.SchedulerResourceManager{}
+ s.resourceManager.Init()
+ s.taskManager = &schedules.SchedulerTaskManager{}
+ s.taskManager.Init()
+ s.algorithmManager = &schedules.SchedulerAlgorithmManager{}
+ s.algorithmManager.Init()
+ s.cronManager = &schedules.SchedulerCronManager{}
+ s.cronManager.Init(s.QueueTaskFromTemplate)
+ go s.startTicker()
+ go s.waitingStartedTask()
+}
+
+/*
+* @Description: LoadConfig loads the configuration from the common package.
+* @Note: This function will load the configuration from the common package.
+ */
+func (s *ScheduleBl) LoadConfig() {
+ // Load configuration from common package
+
+ // startChanSize
const defaultChanSizeConfig = "10"
chanSize := common.GetConfigDefault("start_chan_size",
defaultChanSizeConfig).(string)
Review Comment:
**Missing bounds validation for configuration values**
The configuration loading doesn't validate that the parsed integer values
are within reasonable bounds. Negative or extremely large values could cause
issues.
**Suggested fix:**
```go
const (
minStartChanSize = 1
maxStartChanSize = 10000
minTickerInterval = 1
maxTickerInterval = 3600
)
chanSizeInt, err := strconv.Atoi(chanSize)
if err != nil {
logrus.Errorf("failed to convert start_chan_size to int: %v", err)
logrus.Infof("using default start_chan_size: %s", defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
} else if chanSizeInt < minStartChanSize || chanSizeInt > maxStartChanSize {
logrus.Warnf("start_chan_size %d out of range [%d, %d], using default",
chanSizeInt, minStartChanSize, maxStartChanSize)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
s.startChanSize = chanSizeInt
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]