This is an automated email from the ASF dual-hosted git repository.

narro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e224fa41 fix: high priority pipelines could be starved by `parallel/` 
label (#8568)
6e224fa41 is described below

commit 6e224fa41ce5abcfe3cc4567531a6ef878ff423e
Author: Klesh Wong <zhenmian.hu...@merico.dev>
AuthorDate: Mon Sep 22 08:47:36 2025 +0800

    fix: high priority pipelines could be starved by `parallel/` label (#8568)
    
    * fix: high priority pipelines could be starved by `parallel/` label
    
    * fix: failed on mysql
---
 backend/server/services/pipeline.go | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 78e7df085..f6c770f9c 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -261,8 +261,21 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
        }))
        // prepare query to find an appropriate pipeline to execute
        pipeline = &models.Pipeline{}
+       // 1. find out the current highest priority in the queue
+       top_priority := 0
+       var top_priorities []int
+       where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN, models.TASK_RESUME})
+       err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), 
where_status, dal.Orderby("priority DESC"), dal.Limit(1))
+       if err != nil {
+               panic(err)
+       }
+       if len(top_priorities) > 0 {
+               top_priority = top_priorities[0]
+       }
+       // 2. pick the earlier runnable pipeline with the highest priority
        err = tx.First(pipeline,
-               dal.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN, models.TASK_RESUME}),
+               where_status,
+               dal.Where("priority = ?", top_priority),
                dal.Join(
                        `left join _devlake_pipeline_labels ON
                                _devlake_pipeline_labels.pipeline_id = 
_devlake_pipelines.id AND
@@ -270,10 +283,10 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
                                _devlake_pipeline_labels.name in ?`,
                        runningParallelLabels,
                ),
-               dal.Groupby("priority, id"),
+               dal.Groupby("id"),
                dal.Having("count(_devlake_pipeline_labels.name)=0"),
                dal.Select("id"),
-               dal.Orderby("priority DESC, id ASC"),
+               dal.Orderby("id ASC"),
                dal.Limit(1),
        )
        if err == nil {

Reply via email to