This is an automated email from the ASF dual-hosted git repository. warren pushed a commit to branch fix/pipeline-panic-to-error in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 4e1bef64c1314bf48ab598948cdbd569cf866547 Author: warren <[email protected]> AuthorDate: Mon Mar 16 00:15:42 2026 +0800 fix: replace panics with error returns in pipeline and locking services In dequeuePipeline, replace panic(err) with proper error returns so that the calling loop in RunPipelineInQueue can handle failures gracefully. In RunPipelineInQueue, log and continue instead of panicking when fillPipelineDetail fails. In lockDatabase, use errors.Must for consistent error handling. --- backend/server/services/locking.go | 3 +-- backend/server/services/pipeline.go | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/backend/server/services/locking.go b/backend/server/services/locking.go index f663b0c62..1ff761b9f 100644 --- a/backend/server/services/locking.go +++ b/backend/server/services/locking.go @@ -18,7 +18,6 @@ limitations under the License. package services import ( - "fmt" "os" "time" @@ -61,6 +60,6 @@ func lockDatabase() { select { case <-c: case <-time.After(10 * time.Second): - panic(fmt.Errorf("locking _devlake_locking_stub timeout, the database might be locked by another devlake instance")) + errors.Must(errors.Default.New("locking _devlake_locking_stub timeout, the database might be locked by another devlake instance")) } } diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index f6c770f9c..eaa7f3cc2 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -267,7 +267,7 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}) err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), where_status, dal.Orderby("priority DESC"), dal.Limit(1)) if err != nil { - panic(err) + return nil, err } if len(top_priorities) > 0 { top_priority = top_priorities[0] @@ -303,7 +303,7 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, {ColumnName: "began_at", Value: pipeline.BeganAt}, }, dal.Where("id = ?", pipeline.ID)) if err != nil { - panic(err) + return nil, err } return @@ -341,7 +341,9 @@ func RunPipelineInQueue(pipelineMaxParallel int64) { err = fillPipelineDetail(dbPipeline) if err != nil { - panic(err) + globalPipelineLog.Error(err, "failed to fill pipeline detail for pipeline #%d", dbPipeline.ID) + sema.Release(1) + continue } // add pipelineParallelLabels to runningParallelLabels var pipelineParallelLabels []string
