This is an automated email from the ASF dual-hosted git repository. narro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push: new 7b7875047 feat: pipeline scheduler supports priority (#8534) 7b7875047 is described below commit 7b7875047b58e51197f85df2fcc26286355529d4 Author: Klesh Wong <zhenmian.hu...@merico.dev> AuthorDate: Thu Aug 14 10:31:23 2025 +0800 feat: pipeline scheduler supports priority (#8534) --- backend/core/models/blueprint.go | 1 + .../20250813_add_pipeline_priority.go | 57 ++++++++++++++++++++++ backend/core/models/migrationscripts/register.go | 1 + backend/core/models/pipeline.go | 2 + backend/plugins/org/impl/impl.go | 1 + backend/plugins/org/tasks/sleep.go | 40 +++++++++++++++ backend/plugins/org/tasks/task_data.go | 1 + backend/server/services/blueprint.go | 1 + backend/server/services/pipeline.go | 2 +- backend/server/services/pipeline_helper.go | 1 + config-ui/.yarnrc.yml | 2 + config-ui/package.json | 4 ++ 12 files changed, 112 insertions(+), 1 deletion(-) diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go index f30adec9f..a9b86d635 100644 --- a/backend/core/models/blueprint.go +++ b/backend/core/models/blueprint.go @@ -41,6 +41,7 @@ type Blueprint struct { AfterPlan PipelinePlan `json:"afterPlan" gorm:"serializer:encdec"` Labels []string `json:"labels" gorm:"-"` Connections []*BlueprintConnection `json:"connections" gorm:"-"` + Priority int `json:"priority"` // greater is higher SyncPolicy `gorm:"embedded"` common.Model `swaggerignore:"true"` } diff --git a/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go b/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go new file mode 100644 index 000000000..8697fb6a3 --- /dev/null +++ b/backend/core/models/migrationscripts/20250813_add_pipeline_priority.go @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migrationscripts + +import ( + "github.com/apache/incubator-devlake/core/context" + "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" + "github.com/apache/incubator-devlake/helpers/migrationhelper" +) + +var _ plugin.MigrationScript = (*addPipelinePriority)(nil) + +type addPipelinePriority struct{} + +type blueprint20250813 struct { + Priority int `json:"priority"` +} + +func (blueprint20250813) TableName() string { + return "_devlake_blueprints" +} + +type pipeline20250813 struct { + Priority int `json:"priority"` +} + +func (pipeline20250813) TableName() string { + return "_devlake_pipelines" +} + +func (script *addPipelinePriority) Up(basicRes context.BasicRes) errors.Error { + return migrationhelper.AutoMigrateTables(basicRes, new(blueprint20250813), new(pipeline20250813)) +} + +func (*addPipelinePriority) Version() uint64 { + return 20250813151534 +} + +func (*addPipelinePriority) Name() string { + return "add priority to blueprints and pipelines" +} diff --git a/backend/core/models/migrationscripts/register.go b/backend/core/models/migrationscripts/register.go index 58c773f46..c14ae8740 100644 --- a/backend/core/models/migrationscripts/register.go +++ b/backend/core/models/migrationscripts/register.go @@ -139,5 +139,6 @@ func All() []plugin.MigrationScript { new(increaseCqIssueComponentLength), new(extendFieldSizeForCq), new(addIssueFixVerion), + new(addPipelinePriority), } } diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go index 2cdc3413f..f9613dd0e 100644 --- a/backend/core/models/pipeline.go +++ b/backend/core/models/pipeline.go @@ -66,6 +66,7 @@ type Pipeline struct { SpentSeconds int `json:"spentSeconds"` Stage int `json:"stage"` Labels []string `json:"labels" gorm:"-"` + Priority int `json:"priority"` // greater is higher SyncPolicy `gorm:"embedded"` } @@ -75,6 +76,7 @@ type NewPipeline struct { Name string `json:"name"` Plan PipelinePlan `json:"plan" swaggertype:"array,string" example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"` Labels []string `json:"labels"` + Priority int `json:"priority"` // greater is higher BlueprintId uint64 SyncPolicy `gorm:"embedded"` } diff --git a/backend/plugins/org/impl/impl.go b/backend/plugins/org/impl/impl.go index cd4ffa7fd..e68257eec 100644 --- a/backend/plugins/org/impl/impl.go +++ b/backend/plugins/org/impl/impl.go @@ -61,6 +61,7 @@ func (p Org) SubTaskMetas() []plugin.SubTaskMeta { return []plugin.SubTaskMeta{ tasks.ConnectUserAccountsExactMeta, tasks.SetProjectMappingMeta, + tasks.SleepMeta, } } diff --git a/backend/plugins/org/tasks/sleep.go b/backend/plugins/org/tasks/sleep.go new file mode 100644 index 000000000..43c887b1c --- /dev/null +++ b/backend/plugins/org/tasks/sleep.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "time" + + "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" +) + +var SleepMeta = plugin.SubTaskMeta{ + Name: "sleep", + EntryPoint: Sleep, + EnabledByDefault: false, + Description: "for debugging only", + DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS}, +} + +// SetProjectMapping binds projects and scopes +func Sleep(taskCtx plugin.SubTaskContext) errors.Error { + data := taskCtx.GetData().(*TaskData) + time.Sleep(time.Duration(data.Options.SleepSeconds) * time.Second) + return nil +} diff --git a/backend/plugins/org/tasks/task_data.go b/backend/plugins/org/tasks/task_data.go index ca2a9b300..e3b3cae1c 100644 --- a/backend/plugins/org/tasks/task_data.go +++ b/backend/plugins/org/tasks/task_data.go @@ -22,6 +22,7 @@ import "github.com/apache/incubator-devlake/core/plugin" type Options struct { ConnectionId uint64 `json:"connectionId"` ProjectMappings []ProjectMapping `json:"projectMappings"` + SleepSeconds uint64 `json:"sleepSeconds"` } // ProjectMapping represents the relations between project and scopes diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index 470f8bcdd..91bab6934 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -327,6 +327,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy *models.S newPipeline.Name = blueprint.Name newPipeline.BlueprintId = blueprint.ID newPipeline.Labels = blueprint.Labels + newPipeline.Priority = blueprint.Priority newPipeline.SyncPolicy = blueprint.SyncPolicy // if the plan is empty, we should not create the pipeline diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 413ba3db6..0efccbc3d 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -273,7 +273,7 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, dal.Groupby("id"), dal.Having("count(_devlake_pipeline_labels.name)=0"), dal.Select("id"), - dal.Orderby("id ASC"), + dal.Orderby("priority DESC, id ASC"), dal.Limit(1), ) if err == nil { diff --git a/backend/server/services/pipeline_helper.go b/backend/server/services/pipeline_helper.go index 0e62bc6c7..d7da0c307 100644 --- a/backend/server/services/pipeline_helper.go +++ b/backend/server/services/pipeline_helper.go @@ -67,6 +67,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin Message: "", SpentSeconds: 0, Plan: newPipeline.Plan, + Priority: newPipeline.Priority, SyncPolicy: newPipeline.SyncPolicy, } if newPipeline.BlueprintId != 0 { diff --git a/config-ui/.yarnrc.yml b/config-ui/.yarnrc.yml index 50f1cf9de..9b0fb2d49 100644 --- a/config-ui/.yarnrc.yml +++ b/config-ui/.yarnrc.yml @@ -16,4 +16,6 @@ # nodeLinker: node-modules +npmRegistryServer: "https://registry.npmmirror.com" + yarnPath: .yarn/releases/yarn-3.4.1.cjs diff --git a/config-ui/package.json b/config-ui/package.json index 99d10bf19..c10c1c9d9 100644 --- a/config-ui/package.json +++ b/config-ui/package.json @@ -69,5 +69,9 @@ "typescript": "^5.1.6", "vite": "^5.1.4", "vite-plugin-svgr": "^4.2.0" + }, + "volta": { + "node": "18.20.8", + "yarn": "3.4.1" } }