This is an automated email from the ASF dual-hosted git repository. narro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push: new 6e224fa41 fix: high priority pipelines could be starved by `parallel/` label (#8568) 6e224fa41 is described below commit 6e224fa41ce5abcfe3cc4567531a6ef878ff423e Author: Klesh Wong <zhenmian.hu...@merico.dev> AuthorDate: Mon Sep 22 08:47:36 2025 +0800 fix: high priority pipelines could be starved by `parallel/` label (#8568) * fix: high priority pipelines could be starved by `parallel/` label * fix: failed on mysql --- backend/server/services/pipeline.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 78e7df085..f6c770f9c 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -261,8 +261,21 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, })) // prepare query to find an appropriate pipeline to execute pipeline = &models.Pipeline{} + // 1. find out the current highest priority in the queue + top_priority := 0 + var top_priorities []int + where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}) + err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), where_status, dal.Orderby("priority DESC"), dal.Limit(1)) + if err != nil { + panic(err) + } + if len(top_priorities) > 0 { + top_priority = top_priorities[0] + } + // 2. pick the earlier runnable pipeline with the highest priority err = tx.First(pipeline, - dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}), + where_status, + dal.Where("priority = ?", top_priority), dal.Join( `left join _devlake_pipeline_labels ON _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND @@ -270,10 +283,10 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, _devlake_pipeline_labels.name in ?`, runningParallelLabels, ), - dal.Groupby("priority, id"), + dal.Groupby("id"), dal.Having("count(_devlake_pipeline_labels.name)=0"), dal.Select("id"), - dal.Orderby("priority DESC, id ASC"), + dal.Orderby("id ASC"), dal.Limit(1), ) if err == nil {