This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 1de8a6b3e85f390f1123671ce36581d4eb324675
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Jun 22 22:43:32 2022 +0800

    feat: blueprint normal mode framework-side
---
 impl/dalgorm/dalgorm.go                            |   6 +-
 models/blueprint.go                                |  10 +-
 .../220622-blueprint-normal-mode.go                |  69 ++++++++++++++
 models/migrationscripts/register.go                |   1 +
 models/pipeline.go                                 |   7 +-
 models/task.go                                     |  11 +--
 plugins/core/dal/dal.go                            |   3 +
 plugins/core/plugin_blueprint.go                   |  57 +++++++++++
 plugins/core/plugin_task.go                        |  13 +++
 services/blueprint.go                              | 104 ++++++++++++++++++---
 services/blueprint_test.go                         |  94 +++++++++++++++++++
 services/pipeline.go                               |  19 ++--
 test/api/task/task_test.go                         |   2 +-
 13 files changed, 363 insertions(+), 33 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 72aafd3e..a189d27c 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -109,7 +109,11 @@ func (d *Dalgorm) All(dst interface{}, clauses 
...dal.Clause) error {
 
 // First loads first matched row from database to `dst`, error will be 
returned if no records were found
 func (d *Dalgorm) First(dst interface{}, clauses ...dal.Clause) error {
-       return buildTx(d.db, clauses).First(dst).Error
+       err := buildTx(d.db, clauses).First(dst).Error
+       if err == gorm.ErrRecordNotFound {
+               return dal.ErrRecordNotFound
+       }
+       return err
 }
 
 // Count total records
diff --git a/models/blueprint.go b/models/blueprint.go
index 9ae9052e..d2333afa 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -18,6 +18,8 @@ limitations under the License.
 package models
 
 import (
+       "encoding/json"
+
        "github.com/apache/incubator-devlake/models/common"
        "gorm.io/datatypes"
 )
@@ -28,13 +30,19 @@ const BLUEPRINT_MODE_ADVANCED = "ADVANCED"
 type Blueprint struct {
        Name       string         `json:"name" validate:"required"`
        Mode       string         `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
-       Tasks      datatypes.JSON `json:"tasks"`
+       Plan       datatypes.JSON `json:"plan"`
        Enable     bool           `json:"enable"`
        CronConfig string         `json:"cronConfig"`
        IsManual   bool           `json:"isManual"`
+       Settings   datatypes.JSON `json:"settings"`
        common.Model
 }
 
 func (Blueprint) TableName() string {
        return "_devlake_blueprints"
 }
+
+type BlueprintSettings struct {
+       Version     string          `json:"version" 
validate:"required,semver,oneof=1.0.0"`
+       Connections json.RawMessage `json:"connections" validate:"required"`
+}
diff --git a/models/migrationscripts/220622-blueprint-normal-mode.go 
b/models/migrationscripts/220622-blueprint-normal-mode.go
new file mode 100644
index 00000000..7ba66253
--- /dev/null
+++ b/models/migrationscripts/220622-blueprint-normal-mode.go
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "context"
+
+       "gorm.io/datatypes"
+       "gorm.io/gorm"
+)
+
+// model blueprint
+type blueprintNormalMode_Blueprint struct {
+       Settings datatypes.JSON `json:"settings"`
+}
+
+func (blueprintNormalMode_Blueprint) TableName() string {
+       return "_devlake_blueprints"
+}
+
+// model pipeline
+type blueprintNormalMode_Pipeline struct {
+}
+
+func (blueprintNormalMode_Pipeline) TableName() string {
+       return "_devlake_pipelines"
+}
+
+// migration script
+type blueprintNormalMode struct{}
+
+func (*blueprintNormalMode) Up(ctx context.Context, db *gorm.DB) error {
+       err := db.Migrator().AutoMigrate(&blueprintNormalMode_Blueprint{})
+       if err != nil {
+               return err
+       }
+       err = db.Migrator().RenameColumn(&blueprintNormalMode_Blueprint{}, 
"tasks", "plan")
+       if err != nil {
+               return err
+       }
+       err = db.Migrator().RenameColumn(&blueprintNormalMode_Pipeline{}, 
"tasks", "plan")
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (*blueprintNormalMode) Version() uint64 {
+       return 20220622110537
+}
+
+func (*blueprintNormalMode) Name() string {
+       return "blueprint normal mode support"
+}
diff --git a/models/migrationscripts/register.go 
b/models/migrationscripts/register.go
index 3f1c1dec..9f39472c 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -28,5 +28,6 @@ func All() []migration.Script {
                new(updateSchemas20220527), new(updateSchemas20220528), 
new(updateSchemas20220601),
                new(updateSchemas20220602), new(updateSchemas20220612), 
new(updateSchemas20220613),
                new(updateSchemas20220614), new(updateSchemas2022061402), 
new(updateSchemas20220616),
+               new(blueprintNormalMode),
        }
 }
diff --git a/models/pipeline.go b/models/pipeline.go
index 6a271ac7..3f8d882d 100644
--- a/models/pipeline.go
+++ b/models/pipeline.go
@@ -21,6 +21,7 @@ import (
        "time"
 
        "github.com/apache/incubator-devlake/models/common"
+       "github.com/apache/incubator-devlake/plugins/core"
 
        "gorm.io/datatypes"
 )
@@ -29,7 +30,7 @@ type Pipeline struct {
        common.Model
        Name          string         `json:"name" gorm:"index"`
        BlueprintId   uint64         `json:"blueprintId"`
-       Tasks         datatypes.JSON `json:"tasks"`
+       Plan          datatypes.JSON `json:"plan"`
        TotalTasks    int            `json:"totalTasks"`
        FinishedTasks int            `json:"finishedTasks"`
        BeganAt       *time.Time     `json:"beganAt"`
@@ -43,8 +44,8 @@ type Pipeline struct {
 // We use a 2D array because the request body must be an array of a set of 
tasks
 // to be executed concurrently, while each set is to be executed sequentially.
 type NewPipeline struct {
-       Name        string       `json:"name"`
-       Tasks       [][]*NewTask `json:"tasks"`
+       Name        string            `json:"name"`
+       Plan        core.PipelinePlan `json:"plan"`
        BlueprintId uint64
 }
 
diff --git a/models/task.go b/models/task.go
index d4e32420..9f78d6cf 100644
--- a/models/task.go
+++ b/models/task.go
@@ -21,6 +21,7 @@ import (
        "time"
 
        "github.com/apache/incubator-devlake/models/common"
+       "github.com/apache/incubator-devlake/plugins/core"
        "gorm.io/datatypes"
 )
 
@@ -61,12 +62,10 @@ type Task struct {
 
 type NewTask struct {
        // Plugin name
-       Plugin      string                 `json:"plugin" binding:"required"`
-       Subtasks    []string               `json:"subtasks"`
-       Options     map[string]interface{} `json:"options"`
-       PipelineId  uint64                 `json:"-"`
-       PipelineRow int                    `json:"-"`
-       PipelineCol int                    `json:"-"`
+       *core.PipelineTask
+       PipelineId  uint64 `json:"-"`
+       PipelineRow int    `json:"-"`
+       PipelineCol int    `json:"-"`
 }
 
 func (Task) TableName() string {
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index a8790860..90be4760 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -19,6 +19,7 @@ package dal
 
 import (
        "database/sql"
+       "errors"
 )
 
 type Clause struct {
@@ -129,3 +130,5 @@ const HavingClause string = "Having"
 func Having(clause string, params ...interface{}) Clause {
        return Clause{Type: HavingClause, Data: DalClause{clause, params}}
 }
+
+var ErrRecordNotFound = errors.New("record not found")
diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go
new file mode 100644
index 00000000..5f1f3a82
--- /dev/null
+++ b/plugins/core/plugin_blueprint.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package core
+
+import "encoding/json"
+
+// PluginBlueprint is used to support Blueprint Normal model
+type PluginBlueprintV100 interface {
+       // MakePipelinePlan generates `pipeline.tasks` based on `version` and 
`scope`
+       //
+       // `version` semver from `blueprint.settings.version`
+       // `scope` arbitrary json.RawMessage, depends on `version`, for v0.0.1, 
it is an Array of Objects
+       MakePipelinePlan(connectionId uint64, scope []*BlueprintScopeV100) 
(PipelinePlan, error)
+}
+
+// BlueprintConnectionV100 is the connection definition for protocol v1.0.0
+type BlueprintConnectionV100 struct {
+       Plugin       string                `json:"plugin" validate:"required"`
+       ConnectionId uint64                `json:"connectionId" 
validate:"required"`
+       Scope        []*BlueprintScopeV100 `json:"scope" validate:"required"`
+}
+
+// BlueprintScopeV100 is the scope definition for protocol v1.0.0
+type BlueprintScopeV100 struct {
+       Entities       []string        `json:"entities"`
+       Options        json.RawMessage `json:"options"`
+       Transformation json.RawMessage `json:"transformation"`
+}
+
+// PipelineTask represents a smallest unit of execution inside a PipelinePlan
+type PipelineTask struct {
+       // Plugin name
+       Plugin   string                 `json:"plugin" binding:"required"`
+       Subtasks []string               `json:"subtasks"`
+       Options  map[string]interface{} `json:"options"`
+}
+
+// PipelineStage consist of multiple PipelineTasks, they will be executed in 
parallel
+type PipelineStage []*PipelineTask
+
+// PipelinePlan consist of multiple PipelineStages, they will be executed in 
sequential order
+type PipelinePlan []PipelineStage
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index 6dd1b095..193723e1 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -80,6 +80,18 @@ type SubTask interface {
 // All subtasks from plugins should comply to this prototype, so they could be 
orchestrated by framework
 type SubTaskEntryPoint func(c SubTaskContext) error
 
+const DOMAIN_TYPE_CODE = "CODE"
+const DOMAIN_TYPE_TICKET = "TICKET"
+const DOMAIN_TYPE_CICD = "CICD"
+const DOMAIN_TYPE_CROSS = "CROSS"
+
+var DOMAIN_TYPES = []string{
+       DOMAIN_TYPE_CODE,
+       DOMAIN_TYPE_TICKET,
+       DOMAIN_TYPE_CICD,
+       DOMAIN_TYPE_CROSS,
+}
+
 // Meta data of a subtask
 type SubTaskMeta struct {
        Name       string
@@ -88,6 +100,7 @@ type SubTaskMeta struct {
        Required         bool
        EnabledByDefault bool
        Description      string
+       DomainTypes      []string
 }
 
 // Implement this interface to let framework run tasks for you
diff --git a/services/blueprint.go b/services/blueprint.go
index 0be64d7c..f6b6eb8c 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -24,9 +24,11 @@ import (
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
+       "github.com/apache/incubator-devlake/plugins/core"
        "github.com/go-playground/validator/v10"
        "github.com/mitchellh/mapstructure"
        "github.com/robfig/cron/v3"
+       "gorm.io/datatypes"
        "gorm.io/gorm"
 )
 
@@ -91,10 +93,6 @@ func GetBlueprint(blueprintId uint64) (*models.Blueprint, 
error) {
 }
 
 func validateBlueprint(blueprint *models.Blueprint) error {
-       // TODO: implement NORMAL mode
-       if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
-               return fmt.Errorf("NORMAL mode is yet to be implemented")
-       }
        // validation
        err := vld.Struct(blueprint)
        if err != nil {
@@ -109,17 +107,21 @@ func validateBlueprint(blueprint *models.Blueprint) error 
{
                return fmt.Errorf("cronConfig is required for Automated 
blueprint")
        }
        if blueprint.Mode == models.BLUEPRINT_MODE_ADVANCED {
-               tasks := make([][]models.NewTask, 0)
-               err = json.Unmarshal(blueprint.Tasks, &tasks)
+               plan := make(core.PipelinePlan, 0)
+               err = json.Unmarshal(blueprint.Plan, &plan)
                if err != nil {
-                       return fmt.Errorf("invalid tasks: %w", err)
+                       return fmt.Errorf("invalid plan: %w", err)
                }
                // tasks should not be empty
-               if len(tasks) == 0 || len(tasks[0]) == 0 {
-                       return fmt.Errorf("empty tasks")
+               if len(plan) == 0 || len(plan[0]) == 0 {
+                       return fmt.Errorf("empty plan")
+               }
+       } else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
+               blueprint.Plan, err = GeneratePlanJson(blueprint.Settings)
+               if err != nil {
+                       return fmt.Errorf("invalid plan: %w", err)
                }
        }
-       // TODO: validate each of every task object
        return nil
 }
 
@@ -182,8 +184,19 @@ func ReloadBlueprints(c *cron.Cron) error {
        }
        c.Stop()
        for _, pp := range blueprints {
-               var tasks [][]*models.NewTask
-               err = json.Unmarshal(pp.Tasks, &tasks)
+               if pp.Mode == models.BLUEPRINT_MODE_NORMAL {
+                       // for NORMAL mode, we have to generate the actual 
pipeline plan beforehand
+                       pp.Plan, err = GeneratePlanJson(pp.Settings)
+                       if err != nil {
+                               return err
+                       }
+                       err = db.Save(pp).Error
+                       if err != nil {
+                               return err
+                       }
+               }
+               var plan core.PipelinePlan
+               err = json.Unmarshal(pp.Plan, &plan)
                if err != nil {
                        blueprintLog.Error("created cron job failed: %s", err)
                        return err
@@ -191,7 +204,7 @@ func ReloadBlueprints(c *cron.Cron) error {
                blueprint := pp
                _, err := c.AddFunc(pp.CronConfig, func() {
                        newPipeline := models.NewPipeline{}
-                       newPipeline.Tasks = tasks
+                       newPipeline.Plan = plan
                        newPipeline.Name = blueprint.Name
                        newPipeline.BlueprintId = blueprint.ID
                        pipeline, err := CreatePipeline(&newPipeline)
@@ -218,3 +231,68 @@ func ReloadBlueprints(c *cron.Cron) error {
        log.Info("total %d blueprints were scheduled", len(blueprints))
        return nil
 }
+
+// GeneratePlan generates pipeline plan by version
+func GeneratePlanJson(settings datatypes.JSON) (datatypes.JSON, error) {
+       bpSettings := new(models.BlueprintSettings)
+       err := json.Unmarshal(settings, bpSettings)
+       if err != nil {
+               return nil, err
+       }
+       var plan interface{}
+       switch bpSettings.Version {
+       case "1.0.0":
+               plan, err = GeneratePlanJsonV100(bpSettings)
+       default:
+               return nil, fmt.Errorf("unknown version of blueprint settings: 
%s", bpSettings.Version)
+       }
+       if err != nil {
+               return nil, err
+       }
+       return json.Marshal(plan)
+}
+
+// GenerateTasksBySettingsV100 generates pipeline plan according v1.0.0 
definition
+func GeneratePlanJsonV100(settings *models.BlueprintSettings) 
(core.PipelinePlan, error) {
+       connections := make([]*core.BlueprintConnectionV100, 0)
+       err := json.Unmarshal(settings.Connections, &connections)
+       if err != nil {
+               return nil, err
+       }
+       plans := make([]core.PipelinePlan, len(connections))
+       for i, connection := range connections {
+               if len(connection.Scope) == 0 {
+                       return nil, fmt.Errorf("connections[%d].scope is 
empty", i)
+               }
+               plugin, err := core.GetPlugin(connection.Plugin)
+               if err != nil {
+                       return nil, err
+               }
+               if pluginBp, ok := plugin.(core.PluginBlueprintV100); ok {
+                       plans[i], err = 
pluginBp.MakePipelinePlan(connection.ConnectionId, connection.Scope)
+                       if err != nil {
+                               return nil, err
+                       }
+               } else {
+                       return nil, fmt.Errorf("plugin %s does not support 
blueprint protocol version 1.0.0", connection.Plugin)
+               }
+       }
+       return MergePipelinePlans(plans...), nil
+}
+
+// MergePipelinePlans merges multiple pipelines into one unified pipeline
+func MergePipelinePlans(plans ...core.PipelinePlan) core.PipelinePlan {
+       merged := make(core.PipelinePlan, 0)
+       // iterate all pipelineTasks and try to merge them into `merged`
+       for _, plan := range plans {
+               // add all stages from plan to merged
+               for index, stage := range plan {
+                       if index >= len(merged) {
+                               merged = append(merged, nil)
+                       }
+                       // add all tasks from plan to target respectively
+                       merged[index] = append(merged[index], stage...)
+               }
+       }
+       return merged
+}
diff --git a/services/blueprint_test.go b/services/blueprint_test.go
new file mode 100644
index 00000000..32c08331
--- /dev/null
+++ b/services/blueprint_test.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+       "testing"
+
+       "github.com/apache/incubator-devlake/models"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestMergePipelineTasks(t *testing.T) {
+       plan1 := models.PipelinePlan{
+               []*models.NewTask{
+                       {Plugin: "github"},
+                       {Plugin: "gitlab"},
+               },
+               []*models.NewTask{
+                       {Plugin: "gitextractor1"},
+                       {Plugin: "gitextractor2"},
+               },
+       }
+
+       plan2 := models.PipelinePlan{
+               []*models.NewTask{
+                       {Plugin: "jira"},
+               },
+       }
+
+       plan3 := models.PipelinePlan{
+               []*models.NewTask{
+                       {Plugin: "jenkins"},
+               },
+               []*models.NewTask{
+                       {Plugin: "jenkins"},
+               },
+               []*models.NewTask{
+                       {Plugin: "jenkins"},
+               },
+       }
+
+       assert.Equal(t, plan1, MergePipelinePlans(plan1))
+       assert.Equal(t, plan2, MergePipelinePlans(plan2))
+       assert.Equal(
+               t,
+               models.PipelinePlan{
+                       []*models.NewTask{
+                               {Plugin: "github"},
+                               {Plugin: "gitlab"},
+                               {Plugin: "jira"},
+                       },
+                       []*models.NewTask{
+                               {Plugin: "gitextractor1"},
+                               {Plugin: "gitextractor2"},
+                       },
+               },
+               MergePipelinePlans(plan1, plan2),
+       )
+       assert.Equal(
+               t,
+               models.PipelinePlan{
+                       []*models.NewTask{
+                               {Plugin: "github"},
+                               {Plugin: "gitlab"},
+                               {Plugin: "jira"},
+                               {Plugin: "jenkins"},
+                       },
+                       []*models.NewTask{
+                               {Plugin: "gitextractor1"},
+                               {Plugin: "gitextractor2"},
+                               {Plugin: "jenkins"},
+                       },
+                       []*models.NewTask{
+                               {Plugin: "jenkins"},
+                       },
+               },
+               MergePipelinePlans(plan1, plan2, plan3),
+       )
+}
diff --git a/services/pipeline.go b/services/pipeline.go
index 8d7a8dd0..d255ec56 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -99,12 +99,15 @@ func CreatePipeline(newPipeline *models.NewPipeline) 
(*models.Pipeline, error) {
        }
 
        // create tasks accordingly
-       for i := range newPipeline.Tasks {
-               for j := range newPipeline.Tasks[i] {
-                       newTask := newPipeline.Tasks[i][j]
-                       newTask.PipelineId = pipeline.ID
-                       newTask.PipelineRow = i + 1
-                       newTask.PipelineCol = j + 1
+       for i := range newPipeline.Plan {
+               for j := range newPipeline.Plan[i] {
+                       pipelineTask := newPipeline.Plan[i][j]
+                       newTask := &models.NewTask{
+                               PipelineTask: pipelineTask,
+                               PipelineId:   pipeline.ID,
+                               PipelineRow:  i + 1,
+                               PipelineCol:  j + 1,
+                       }
                        _, err := CreateTask(newTask)
                        if err != nil {
                                pipelineLog.Error("create task for pipeline 
failed: %w", err)
@@ -123,13 +126,13 @@ func CreatePipeline(newPipeline *models.NewPipeline) 
(*models.Pipeline, error) {
        }
 
        // update tasks state
-       pipeline.Tasks, err = json.Marshal(newPipeline.Tasks)
+       pipeline.Plan, err = json.Marshal(newPipeline.Plan)
        if err != nil {
                return nil, err
        }
        err = db.Model(pipeline).Updates(map[string]interface{}{
                "total_tasks": pipeline.TotalTasks,
-               "tasks":       pipeline.Tasks,
+               "plan":        pipeline.Plan,
        }).Error
        if err != nil {
                pipelineLog.Error("update pipline state failed: %w", err)
diff --git a/test/api/task/task_test.go b/test/api/task/task_test.go
index 1cc712d4..219af2dd 100644
--- a/test/api/task/task_test.go
+++ b/test/api/task/task_test.go
@@ -68,7 +68,7 @@ func TestNewTask(t *testing.T) {
        assert.Equal(t, pipeline.Name, "hello")
 
        var tasks [][]*models.NewTask
-       err = json.Unmarshal(pipeline.Tasks, &tasks)
+       err = json.Unmarshal(pipeline.Plan, &tasks)
        if err != nil {
                t.Fatal(err)
        }

Reply via email to