This is an automated email from the ASF dual-hosted git repository. warren pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 1de8a6b3e85f390f1123671ce36581d4eb324675 Author: Klesh Wong <[email protected]> AuthorDate: Wed Jun 22 22:43:32 2022 +0800 feat: blueprint normal mode framework-side --- impl/dalgorm/dalgorm.go | 6 +- models/blueprint.go | 10 +- .../220622-blueprint-normal-mode.go | 69 ++++++++++++++ models/migrationscripts/register.go | 1 + models/pipeline.go | 7 +- models/task.go | 11 +-- plugins/core/dal/dal.go | 3 + plugins/core/plugin_blueprint.go | 57 +++++++++++ plugins/core/plugin_task.go | 13 +++ services/blueprint.go | 104 ++++++++++++++++++--- services/blueprint_test.go | 94 +++++++++++++++++++ services/pipeline.go | 19 ++-- test/api/task/task_test.go | 2 +- 13 files changed, 363 insertions(+), 33 deletions(-) diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go index 72aafd3e..a189d27c 100644 --- a/impl/dalgorm/dalgorm.go +++ b/impl/dalgorm/dalgorm.go @@ -109,7 +109,11 @@ func (d *Dalgorm) All(dst interface{}, clauses ...dal.Clause) error { // First loads first matched row from database to `dst`, error will be returned if no records were found func (d *Dalgorm) First(dst interface{}, clauses ...dal.Clause) error { - return buildTx(d.db, clauses).First(dst).Error + err := buildTx(d.db, clauses).First(dst).Error + if err == gorm.ErrRecordNotFound { + return dal.ErrRecordNotFound + } + return err } // Count total records diff --git a/models/blueprint.go b/models/blueprint.go index 9ae9052e..d2333afa 100644 --- a/models/blueprint.go +++ b/models/blueprint.go @@ -18,6 +18,8 @@ limitations under the License. package models import ( + "encoding/json" + "github.com/apache/incubator-devlake/models/common" "gorm.io/datatypes" ) @@ -28,13 +30,19 @@ const BLUEPRINT_MODE_ADVANCED = "ADVANCED" type Blueprint struct { Name string `json:"name" validate:"required"` Mode string `json:"mode" gorm:"varchar(20)" validate:"required,oneof=NORMAL ADVANCED"` - Tasks datatypes.JSON `json:"tasks"` + Plan datatypes.JSON `json:"plan"` Enable bool `json:"enable"` CronConfig string `json:"cronConfig"` IsManual bool `json:"isManual"` + Settings datatypes.JSON `json:"settings"` common.Model } func (Blueprint) TableName() string { return "_devlake_blueprints" } + +type BlueprintSettings struct { + Version string `json:"version" validate:"required,semver,oneof=1.0.0"` + Connections json.RawMessage `json:"connections" validate:"required"` +} diff --git a/models/migrationscripts/220622-blueprint-normal-mode.go b/models/migrationscripts/220622-blueprint-normal-mode.go new file mode 100644 index 00000000..7ba66253 --- /dev/null +++ b/models/migrationscripts/220622-blueprint-normal-mode.go @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migrationscripts + +import ( + "context" + + "gorm.io/datatypes" + "gorm.io/gorm" +) + +// model blueprint +type blueprintNormalMode_Blueprint struct { + Settings datatypes.JSON `json:"settings"` +} + +func (blueprintNormalMode_Blueprint) TableName() string { + return "_devlake_blueprints" +} + +// model pipeline +type blueprintNormalMode_Pipeline struct { +} + +func (blueprintNormalMode_Pipeline) TableName() string { + return "_devlake_pipelines" +} + +// migration script +type blueprintNormalMode struct{} + +func (*blueprintNormalMode) Up(ctx context.Context, db *gorm.DB) error { + err := db.Migrator().AutoMigrate(&blueprintNormalMode_Blueprint{}) + if err != nil { + return err + } + err = db.Migrator().RenameColumn(&blueprintNormalMode_Blueprint{}, "tasks", "plan") + if err != nil { + return err + } + err = db.Migrator().RenameColumn(&blueprintNormalMode_Pipeline{}, "tasks", "plan") + if err != nil { + return err + } + return nil +} + +func (*blueprintNormalMode) Version() uint64 { + return 20220622110537 +} + +func (*blueprintNormalMode) Name() string { + return "blueprint normal mode support" +} diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go index 3f1c1dec..9f39472c 100644 --- a/models/migrationscripts/register.go +++ b/models/migrationscripts/register.go @@ -28,5 +28,6 @@ func All() []migration.Script { new(updateSchemas20220527), new(updateSchemas20220528), new(updateSchemas20220601), new(updateSchemas20220602), new(updateSchemas20220612), new(updateSchemas20220613), new(updateSchemas20220614), new(updateSchemas2022061402), new(updateSchemas20220616), + new(blueprintNormalMode), } } diff --git a/models/pipeline.go b/models/pipeline.go index 6a271ac7..3f8d882d 100644 --- a/models/pipeline.go +++ b/models/pipeline.go @@ -21,6 +21,7 @@ import ( "time" "github.com/apache/incubator-devlake/models/common" + "github.com/apache/incubator-devlake/plugins/core" "gorm.io/datatypes" ) @@ -29,7 +30,7 @@ type Pipeline struct { common.Model Name string `json:"name" gorm:"index"` BlueprintId uint64 `json:"blueprintId"` - Tasks datatypes.JSON `json:"tasks"` + Plan datatypes.JSON `json:"plan"` TotalTasks int `json:"totalTasks"` FinishedTasks int `json:"finishedTasks"` BeganAt *time.Time `json:"beganAt"` @@ -43,8 +44,8 @@ type Pipeline struct { // We use a 2D array because the request body must be an array of a set of tasks // to be executed concurrently, while each set is to be executed sequentially. type NewPipeline struct { - Name string `json:"name"` - Tasks [][]*NewTask `json:"tasks"` + Name string `json:"name"` + Plan core.PipelinePlan `json:"plan"` BlueprintId uint64 } diff --git a/models/task.go b/models/task.go index d4e32420..9f78d6cf 100644 --- a/models/task.go +++ b/models/task.go @@ -21,6 +21,7 @@ import ( "time" "github.com/apache/incubator-devlake/models/common" + "github.com/apache/incubator-devlake/plugins/core" "gorm.io/datatypes" ) @@ -61,12 +62,10 @@ type Task struct { type NewTask struct { // Plugin name - Plugin string `json:"plugin" binding:"required"` - Subtasks []string `json:"subtasks"` - Options map[string]interface{} `json:"options"` - PipelineId uint64 `json:"-"` - PipelineRow int `json:"-"` - PipelineCol int `json:"-"` + *core.PipelineTask + PipelineId uint64 `json:"-"` + PipelineRow int `json:"-"` + PipelineCol int `json:"-"` } func (Task) TableName() string { diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go index a8790860..90be4760 100644 --- a/plugins/core/dal/dal.go +++ b/plugins/core/dal/dal.go @@ -19,6 +19,7 @@ package dal import ( "database/sql" + "errors" ) type Clause struct { @@ -129,3 +130,5 @@ const HavingClause string = "Having" func Having(clause string, params ...interface{}) Clause { return Clause{Type: HavingClause, Data: DalClause{clause, params}} } + +var ErrRecordNotFound = errors.New("record not found") diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go new file mode 100644 index 00000000..5f1f3a82 --- /dev/null +++ b/plugins/core/plugin_blueprint.go @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import "encoding/json" + +// PluginBlueprint is used to support Blueprint Normal model +type PluginBlueprintV100 interface { + // MakePipelinePlan generates `pipeline.tasks` based on `version` and `scope` + // + // `version` semver from `blueprint.settings.version` + // `scope` arbitrary json.RawMessage, depends on `version`, for v0.0.1, it is an Array of Objects + MakePipelinePlan(connectionId uint64, scope []*BlueprintScopeV100) (PipelinePlan, error) +} + +// BlueprintConnectionV100 is the connection definition for protocol v1.0.0 +type BlueprintConnectionV100 struct { + Plugin string `json:"plugin" validate:"required"` + ConnectionId uint64 `json:"connectionId" validate:"required"` + Scope []*BlueprintScopeV100 `json:"scope" validate:"required"` +} + +// BlueprintScopeV100 is the scope definition for protocol v1.0.0 +type BlueprintScopeV100 struct { + Entities []string `json:"entities"` + Options json.RawMessage `json:"options"` + Transformation json.RawMessage `json:"transformation"` +} + +// PipelineTask represents a smallest unit of execution inside a PipelinePlan +type PipelineTask struct { + // Plugin name + Plugin string `json:"plugin" binding:"required"` + Subtasks []string `json:"subtasks"` + Options map[string]interface{} `json:"options"` +} + +// PipelineStage consist of multiple PipelineTasks, they will be executed in parallel +type PipelineStage []*PipelineTask + +// PipelinePlan consist of multiple PipelineStages, they will be executed in sequential order +type PipelinePlan []PipelineStage diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go index 6dd1b095..193723e1 100644 --- a/plugins/core/plugin_task.go +++ b/plugins/core/plugin_task.go @@ -80,6 +80,18 @@ type SubTask interface { // All subtasks from plugins should comply to this prototype, so they could be orchestrated by framework type SubTaskEntryPoint func(c SubTaskContext) error +const DOMAIN_TYPE_CODE = "CODE" +const DOMAIN_TYPE_TICKET = "TICKET" +const DOMAIN_TYPE_CICD = "CICD" +const DOMAIN_TYPE_CROSS = "CROSS" + +var DOMAIN_TYPES = []string{ + DOMAIN_TYPE_CODE, + DOMAIN_TYPE_TICKET, + DOMAIN_TYPE_CICD, + DOMAIN_TYPE_CROSS, +} + // Meta data of a subtask type SubTaskMeta struct { Name string @@ -88,6 +100,7 @@ type SubTaskMeta struct { Required bool EnabledByDefault bool Description string + DomainTypes []string } // Implement this interface to let framework run tasks for you diff --git a/services/blueprint.go b/services/blueprint.go index 0be64d7c..f6b6eb8c 100644 --- a/services/blueprint.go +++ b/services/blueprint.go @@ -24,9 +24,11 @@ import ( "github.com/apache/incubator-devlake/errors" "github.com/apache/incubator-devlake/logger" "github.com/apache/incubator-devlake/models" + "github.com/apache/incubator-devlake/plugins/core" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" "github.com/robfig/cron/v3" + "gorm.io/datatypes" "gorm.io/gorm" ) @@ -91,10 +93,6 @@ func GetBlueprint(blueprintId uint64) (*models.Blueprint, error) { } func validateBlueprint(blueprint *models.Blueprint) error { - // TODO: implement NORMAL mode - if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL { - return fmt.Errorf("NORMAL mode is yet to be implemented") - } // validation err := vld.Struct(blueprint) if err != nil { @@ -109,17 +107,21 @@ func validateBlueprint(blueprint *models.Blueprint) error { return fmt.Errorf("cronConfig is required for Automated blueprint") } if blueprint.Mode == models.BLUEPRINT_MODE_ADVANCED { - tasks := make([][]models.NewTask, 0) - err = json.Unmarshal(blueprint.Tasks, &tasks) + plan := make(core.PipelinePlan, 0) + err = json.Unmarshal(blueprint.Plan, &plan) if err != nil { - return fmt.Errorf("invalid tasks: %w", err) + return fmt.Errorf("invalid plan: %w", err) } // tasks should not be empty - if len(tasks) == 0 || len(tasks[0]) == 0 { - return fmt.Errorf("empty tasks") + if len(plan) == 0 || len(plan[0]) == 0 { + return fmt.Errorf("empty plan") + } + } else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL { + blueprint.Plan, err = GeneratePlanJson(blueprint.Settings) + if err != nil { + return fmt.Errorf("invalid plan: %w", err) } } - // TODO: validate each of every task object return nil } @@ -182,8 +184,19 @@ func ReloadBlueprints(c *cron.Cron) error { } c.Stop() for _, pp := range blueprints { - var tasks [][]*models.NewTask - err = json.Unmarshal(pp.Tasks, &tasks) + if pp.Mode == models.BLUEPRINT_MODE_NORMAL { + // for NORMAL mode, we have to generate the actual pipeline plan beforehand + pp.Plan, err = GeneratePlanJson(pp.Settings) + if err != nil { + return err + } + err = db.Save(pp).Error + if err != nil { + return err + } + } + var plan core.PipelinePlan + err = json.Unmarshal(pp.Plan, &plan) if err != nil { blueprintLog.Error("created cron job failed: %s", err) return err @@ -191,7 +204,7 @@ func ReloadBlueprints(c *cron.Cron) error { blueprint := pp _, err := c.AddFunc(pp.CronConfig, func() { newPipeline := models.NewPipeline{} - newPipeline.Tasks = tasks + newPipeline.Plan = plan newPipeline.Name = blueprint.Name newPipeline.BlueprintId = blueprint.ID pipeline, err := CreatePipeline(&newPipeline) @@ -218,3 +231,68 @@ func ReloadBlueprints(c *cron.Cron) error { log.Info("total %d blueprints were scheduled", len(blueprints)) return nil } + +// GeneratePlan generates pipeline plan by version +func GeneratePlanJson(settings datatypes.JSON) (datatypes.JSON, error) { + bpSettings := new(models.BlueprintSettings) + err := json.Unmarshal(settings, bpSettings) + if err != nil { + return nil, err + } + var plan interface{} + switch bpSettings.Version { + case "1.0.0": + plan, err = GeneratePlanJsonV100(bpSettings) + default: + return nil, fmt.Errorf("unknown version of blueprint settings: %s", bpSettings.Version) + } + if err != nil { + return nil, err + } + return json.Marshal(plan) +} + +// GenerateTasksBySettingsV100 generates pipeline plan according v1.0.0 definition +func GeneratePlanJsonV100(settings *models.BlueprintSettings) (core.PipelinePlan, error) { + connections := make([]*core.BlueprintConnectionV100, 0) + err := json.Unmarshal(settings.Connections, &connections) + if err != nil { + return nil, err + } + plans := make([]core.PipelinePlan, len(connections)) + for i, connection := range connections { + if len(connection.Scope) == 0 { + return nil, fmt.Errorf("connections[%d].scope is empty", i) + } + plugin, err := core.GetPlugin(connection.Plugin) + if err != nil { + return nil, err + } + if pluginBp, ok := plugin.(core.PluginBlueprintV100); ok { + plans[i], err = pluginBp.MakePipelinePlan(connection.ConnectionId, connection.Scope) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("plugin %s does not support blueprint protocol version 1.0.0", connection.Plugin) + } + } + return MergePipelinePlans(plans...), nil +} + +// MergePipelinePlans merges multiple pipelines into one unified pipeline +func MergePipelinePlans(plans ...core.PipelinePlan) core.PipelinePlan { + merged := make(core.PipelinePlan, 0) + // iterate all pipelineTasks and try to merge them into `merged` + for _, plan := range plans { + // add all stages from plan to merged + for index, stage := range plan { + if index >= len(merged) { + merged = append(merged, nil) + } + // add all tasks from plan to target respectively + merged[index] = append(merged[index], stage...) + } + } + return merged +} diff --git a/services/blueprint_test.go b/services/blueprint_test.go new file mode 100644 index 00000000..32c08331 --- /dev/null +++ b/services/blueprint_test.go @@ -0,0 +1,94 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "testing" + + "github.com/apache/incubator-devlake/models" + "github.com/stretchr/testify/assert" +) + +func TestMergePipelineTasks(t *testing.T) { + plan1 := models.PipelinePlan{ + []*models.NewTask{ + {Plugin: "github"}, + {Plugin: "gitlab"}, + }, + []*models.NewTask{ + {Plugin: "gitextractor1"}, + {Plugin: "gitextractor2"}, + }, + } + + plan2 := models.PipelinePlan{ + []*models.NewTask{ + {Plugin: "jira"}, + }, + } + + plan3 := models.PipelinePlan{ + []*models.NewTask{ + {Plugin: "jenkins"}, + }, + []*models.NewTask{ + {Plugin: "jenkins"}, + }, + []*models.NewTask{ + {Plugin: "jenkins"}, + }, + } + + assert.Equal(t, plan1, MergePipelinePlans(plan1)) + assert.Equal(t, plan2, MergePipelinePlans(plan2)) + assert.Equal( + t, + models.PipelinePlan{ + []*models.NewTask{ + {Plugin: "github"}, + {Plugin: "gitlab"}, + {Plugin: "jira"}, + }, + []*models.NewTask{ + {Plugin: "gitextractor1"}, + {Plugin: "gitextractor2"}, + }, + }, + MergePipelinePlans(plan1, plan2), + ) + assert.Equal( + t, + models.PipelinePlan{ + []*models.NewTask{ + {Plugin: "github"}, + {Plugin: "gitlab"}, + {Plugin: "jira"}, + {Plugin: "jenkins"}, + }, + []*models.NewTask{ + {Plugin: "gitextractor1"}, + {Plugin: "gitextractor2"}, + {Plugin: "jenkins"}, + }, + []*models.NewTask{ + {Plugin: "jenkins"}, + }, + }, + MergePipelinePlans(plan1, plan2, plan3), + ) +} diff --git a/services/pipeline.go b/services/pipeline.go index 8d7a8dd0..d255ec56 100644 --- a/services/pipeline.go +++ b/services/pipeline.go @@ -99,12 +99,15 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) { } // create tasks accordingly - for i := range newPipeline.Tasks { - for j := range newPipeline.Tasks[i] { - newTask := newPipeline.Tasks[i][j] - newTask.PipelineId = pipeline.ID - newTask.PipelineRow = i + 1 - newTask.PipelineCol = j + 1 + for i := range newPipeline.Plan { + for j := range newPipeline.Plan[i] { + pipelineTask := newPipeline.Plan[i][j] + newTask := &models.NewTask{ + PipelineTask: pipelineTask, + PipelineId: pipeline.ID, + PipelineRow: i + 1, + PipelineCol: j + 1, + } _, err := CreateTask(newTask) if err != nil { pipelineLog.Error("create task for pipeline failed: %w", err) @@ -123,13 +126,13 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) { } // update tasks state - pipeline.Tasks, err = json.Marshal(newPipeline.Tasks) + pipeline.Plan, err = json.Marshal(newPipeline.Plan) if err != nil { return nil, err } err = db.Model(pipeline).Updates(map[string]interface{}{ "total_tasks": pipeline.TotalTasks, - "tasks": pipeline.Tasks, + "plan": pipeline.Plan, }).Error if err != nil { pipelineLog.Error("update pipline state failed: %w", err) diff --git a/test/api/task/task_test.go b/test/api/task/task_test.go index 1cc712d4..219af2dd 100644 --- a/test/api/task/task_test.go +++ b/test/api/task/task_test.go @@ -68,7 +68,7 @@ func TestNewTask(t *testing.T) { assert.Equal(t, pipeline.Name, "hello") var tasks [][]*models.NewTask - err = json.Unmarshal(pipeline.Tasks, &tasks) + err = json.Unmarshal(pipeline.Plan, &tasks) if err != nil { t.Fatal(err) }
