likyh commented on code in PR #2569:
URL: https://github.com/apache/incubator-devlake/pull/2569#discussion_r926775652


##########
services/blueprint.go:
##########
@@ -312,5 +311,5 @@ func TriggerBlueprint(id uint64) (*models.Pipeline, error) {
        }
        pipeline, err := createAndRunPipelineByBlueprint(blueprint.ID, 
blueprint.Name, plan)

Review Comment:
   createPipelineByBlueprint



##########
services/pipeline.go:
##########
@@ -183,8 +191,40 @@ func GetPipeline(pipelineId uint64) (*models.Pipeline, 
error) {
        return pipeline, nil
 }
 
-// RunPipeline FIXME ...
-func RunPipeline(pipelineId uint64) error {
+// RunPipelineInQueue query pipeline from db and run it in a queue
+func RunPipelineInQueue(pipelineMaxParallel int64) {
+       sema := semaphore.NewWeighted(pipelineMaxParallel)
+       startedPipelineIds := []uint64{}
+       for true {
+               pipelineLog.Info("wait for new pipeline")
+               // start goroutine when sema lock ready and pipeline exist.
+               // to avoid read old pipeline, acquire lock before read exist 
pipeline
+               err := sema.Acquire(context.TODO(), 1)
+               if err != nil {
+                       panic(err)
+               }
+               pipelineLog.Info("get lock and wait pipeline")
+               pipeline := &models.Pipeline{}
+               for true {
+                       db.Where("status = ?", models.TASK_CREATED).
+                               Not(startedPipelineIds).
+                               Order("id ASC").Limit(1).Find(pipeline)
+                       if pipeline.ID != 0 {
+                               break
+                       }
+                       time.Sleep(time.Second)
+               }
+               startedPipelineIds = append(startedPipelineIds, pipeline.ID)
+               go func() {
+                       pipelineLog.Info("run pipeline, %d", pipeline.ID)
+                       _ = runPipeline(pipeline.ID)
+                       defer sema.Release(1)

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to