This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b61d4ab7 Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
3b61d4ab7 is described below

commit 3b61d4ab76fe85f45348c35d7515372c1d4d0a3c
Author: Likyh <[email protected]>
AuthorDate: Thu Nov 24 10:47:45 2022 +0800

    Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
    
    * fix: delete a unuse var
    
    * feat: add parallelLabels in pipeline
    
    * feat: add parallel_label in pipeline api
    
    * feat: add parallel labels for blueprint
    
    * fix: fix for linter
    
    * fix: fix some bug found by api docs
    
    * feat: use common label to replace parallel
    
    * fix: fix a bug
    
    * fix: add lock for review
    
    * fix: fix for review, add Labels in the model DbPipeline and DbBlueprint.
    
    * fix: use tab instead of space in sql
---
 api/blueprints/blueprints.go                       | 10 ++-
 api/pipelines/pipelines.go                         | 12 +--
 config-ui/src/hooks/usePipelineManager.jsx         | 10 +--
 .../src/pages/blueprints/blueprint-detail.jsx      |  2 +-
 .../src/pages/blueprints/create-blueprint.jsx      |  2 +-
 models/blueprint.go                                | 16 +++-
 models/migrationscripts/20221115_add_labels.go     | 61 ++++++++++++++
 models/migrationscripts/register.go                |  1 +
 models/pipeline.go                                 | 15 ++++
 plugins/helper/api_async_client.go                 |  1 -
 plugins/helper/worker_scheduler.go                 |  1 -
 plugins/helper/worker_scheduler_test.go            |  2 +-
 services/blueprint.go                              | 58 +++++++-------
 services/blueprint_helper.go                       | 93 +++++++++++++++++-----
 services/pipeline.go                               | 53 +++++++++---
 services/pipeline_helper.go                        | 71 ++++++++++++++---
 services/pipeline_runner.go                        | 21 ++---
 utils/slice.go                                     | 37 +++++++++
 utils/slice_test.go                                | 30 +++++++
 19 files changed, 402 insertions(+), 94 deletions(-)

diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index 517f6c30e..a28642672 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -29,8 +29,8 @@ import (
 )
 
 type PaginatedBlueprint struct {
-       Blueprints []*models.Blueprint
-       Count      int64
+       Blueprints []*models.Blueprint `json:"blueprints"`
+       Count      int64               `json:"count"`
 }
 
 // @Summary post blueprints
@@ -63,7 +63,11 @@ func Post(c *gin.Context) {
 // @Summary get blueprints
 // @Description get blueprints
 // @Tags framework/blueprints
-// @Accept application/json
+// @Param enable query bool false "enable"
+// @Param is_manual query bool false "is_manual"
+// @Param page query int false "page"
+// @Param page_size query int false "page_size"
+// @Param label query string false "label"
 // @Success 200  {object} PaginatedBlueprint
 // @Failure 400  {object} shared.ApiBody "Bad Request"
 // @Failure 500  {object} shared.ApiBody "Internal Error"
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index 4a144f124..c35ab5d19 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -81,12 +81,14 @@ GET 
/pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
 */
 
 // @Summary Get list of pipelines
-// @Description GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
+// @Description GET 
/pipelines?status=TASK_RUNNING&pending=1&label=search_text&page=1&pagesize=10
 // @Tags framework/pipelines
-// @Param status query string true "query"
-// @Param pending query int true "query"
-// @Param page query int true "query"
-// @Param pagesize query int true "query"
+// @Param status query string false "status"
+// @Param pending query int false "pending"
+// @Param page query int false "page"
+// @Param pagesize query int false "pagesize"
+// @Param blueprint_id query int false "blueprint_id"
+// @Param label query string false "label"
 // @Success 200  {object} shared.ResponsePipelines
 // @Failure 400  {string} errcode.Error "Bad Request"
 // @Failure 500  {string} errcode.Error "Internel Error"
diff --git a/config-ui/src/hooks/usePipelineManager.jsx 
b/config-ui/src/hooks/usePipelineManager.jsx
index c9e3ab17f..fdbd7647e 100644
--- a/config-ui/src/hooks/usePipelineManager.jsx
+++ b/config-ui/src/hooks/usePipelineManager.jsx
@@ -57,21 +57,19 @@ function usePipelineManager(
   const [logfile, setLogfile] = useState('logging.tar.gz')
 
   const runPipeline = useCallback(
-    (runSettings = null) => {
-      console.log('>> RUNNING PIPELINE....')
+    (blueprintId) => {
+      console.log('>> RUNNING PIPELINE....', blueprintId)
       try {
         setIsRunning(true)
         setErrors([])
         ToastNotification.clear()
-        console.log('>> DISPATCHING PIPELINE REQUEST', runSettings || settings)
         const run = async () => {
           // @todo: remove "ID" fallback key when no longer needed
           const p = await request.post(
-            `${DEVLAKE_ENDPOINT}/pipelines`,
-            runSettings || settings
+            `${DEVLAKE_ENDPOINT}/blueprints/${blueprintId}/trigger`
           )
           const t = await request.get(
-            `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.ID || p.data?.id}/tasks`
+            `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.id}/tasks`
           )
           console.log('>> RAW PIPELINE DATA FROM API...', p.data)
           setPipelineRun({
diff --git a/config-ui/src/pages/blueprints/blueprint-detail.jsx 
b/config-ui/src/pages/blueprints/blueprint-detail.jsx
index 5d77b5368..72929b968 100644
--- a/config-ui/src/pages/blueprints/blueprint-detail.jsx
+++ b/config-ui/src/pages/blueprints/blueprint-detail.jsx
@@ -134,7 +134,7 @@ const BlueprintDetail = (props) => {
 
   const runBlueprint = useCallback(() => {
     if (activeBlueprint !== null) {
-      runPipeline()
+      runPipeline(activeBlueprint.id)
     }
   }, [activeBlueprint, runPipeline])
 
diff --git a/config-ui/src/pages/blueprints/create-blueprint.jsx 
b/config-ui/src/pages/blueprints/create-blueprint.jsx
index f8cdf75fd..11d1b3cef 100644
--- a/config-ui/src/pages/blueprints/create-blueprint.jsx
+++ b/config-ui/src/pages/blueprints/create-blueprint.jsx
@@ -812,7 +812,7 @@ const CreateBlueprint = (props) => {
         blueprintId: saveBlueprintComplete?.id,
         plan: saveBlueprintComplete?.plan
       }
-      runPipeline(newPipelineConfiguration)
+      runPipeline(saveBlueprintComplete?.id)
       setRunNow(false)
       history.push(`/blueprints/detail/${saveBlueprintComplete?.id}`)
     } else if (newBlueprintId) {
diff --git a/models/blueprint.go b/models/blueprint.go
index 4d00a0931..f223330ff 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -19,9 +19,9 @@ package models
 
 import (
        "encoding/json"
+       "time"
 
        "github.com/apache/incubator-devlake/errors"
-
        "github.com/apache/incubator-devlake/models/common"
        "github.com/apache/incubator-devlake/plugins/core"
 )
@@ -42,6 +42,7 @@ type Blueprint struct {
        CronConfig   string          `json:"cronConfig" format:"* * * * *" 
example:"0 0 * * 1"`
        IsManual     bool            `json:"isManual"`
        SkipOnFail   bool            `json:"skipOnFail"`
+       Labels       []string        `json:"labels"`
        Settings     json.RawMessage `json:"settings" 
swaggertype:"array,string" example:"please check api: 
/blueprints/<PLUGIN_NAME>/blueprint-setting"`
        common.Model `swaggerignore:"true"`
 }
@@ -77,8 +78,21 @@ type DbBlueprint struct {
        SkipOnFail   bool   `json:"skipOnFail"`
        Settings     string `json:"settings" encrypt:"yes" 
swaggertype:"array,string" example:"please check api: 
/blueprints/<PLUGIN_NAME>/blueprint-setting"`
        common.Model `swaggerignore:"true"`
+
+       Labels []DbBlueprintLabel `json:"-" gorm:"-"`
 }
 
 func (DbBlueprint) TableName() string {
        return "_devlake_blueprints"
 }
+
+type DbBlueprintLabel struct {
+       CreatedAt   time.Time `json:"createdAt"`
+       UpdatedAt   time.Time `json:"updatedAt"`
+       BlueprintId uint64    `json:"blueprint_id" gorm:"primaryKey"`
+       Name        string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel) TableName() string {
+       return "_devlake_blueprint_labels"
+}
diff --git a/models/migrationscripts/20221115_add_labels.go 
b/models/migrationscripts/20221115_add_labels.go
new file mode 100644
index 000000000..0c82066bf
--- /dev/null
+++ b/models/migrationscripts/20221115_add_labels.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "time"
+)
+
+type DbPipelineLabel20221115 struct {
+       CreatedAt  time.Time `json:"createdAt"`
+       UpdatedAt  time.Time `json:"updatedAt"`
+       PipelineId uint64    `json:"pipeline_id" gorm:"primaryKey"`
+       Name       string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel20221115) TableName() string {
+       return "_devlake_pipeline_labels"
+}
+
+type DbBlueprintLabel20221115 struct {
+       CreatedAt   time.Time `json:"createdAt"`
+       UpdatedAt   time.Time `json:"updatedAt"`
+       BlueprintId uint64    `json:"blueprint_id" gorm:"primaryKey"`
+       Name        string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel20221115) TableName() string {
+       return "_devlake_blueprint_labels"
+}
+
+type addLabels struct{}
+
+func (*addLabels) Up(res core.BasicRes) errors.Error {
+       return migrationhelper.AutoMigrateTables(res, 
&DbPipelineLabel20221115{}, &DbBlueprintLabel20221115{})
+}
+
+func (*addLabels) Version() uint64 {
+       return 20221115000034
+}
+
+func (*addLabels) Name() string {
+       return "add labels' schema for blueprint and pipeline"
+}
diff --git a/models/migrationscripts/register.go 
b/models/migrationscripts/register.go
index 00478e238..943616df1 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -60,5 +60,6 @@ func All() []core.MigrationScript {
                new(addProjectTables),
                new(addProjectToBluePrint),
                new(addProjectIssueMetric),
+               new(addLabels),
        }
 }
diff --git a/models/pipeline.go b/models/pipeline.go
index e3d56465e..7427c2338 100644
--- a/models/pipeline.go
+++ b/models/pipeline.go
@@ -39,6 +39,7 @@ type Pipeline struct {
        Message       string         `json:"message"`
        SpentSeconds  int            `json:"spentSeconds"`
        Stage         int            `json:"stage"`
+       Labels        []string       `json:"labels"`
 }
 
 // We use a 2D array because the request body must be an array of a set of 
tasks
@@ -46,6 +47,7 @@ type Pipeline struct {
 type NewPipeline struct {
        Name        string            `json:"name"`
        Plan        core.PipelinePlan `json:"plan" swaggertype:"array,string" 
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
+       Labels      []string          `json:"labels"`
        BlueprintId uint64
 }
 
@@ -62,8 +64,21 @@ type DbPipeline struct {
        Message       string     `json:"message"`
        SpentSeconds  int        `json:"spentSeconds"`
        Stage         int        `json:"stage"`
+
+       Labels []DbPipelineLabel `json:"-" gorm:"-"`
 }
 
 func (DbPipeline) TableName() string {
        return "_devlake_pipelines"
 }
+
+type DbPipelineLabel struct {
+       CreatedAt  time.Time `json:"createdAt"`
+       UpdatedAt  time.Time `json:"updatedAt"`
+       PipelineId uint64    `json:"pipeline_id" gorm:"primaryKey"`
+       Name       string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel) TableName() string {
+       return "_devlake_pipeline_labels"
+}
diff --git a/plugins/helper/api_async_client.go 
b/plugins/helper/api_async_client.go
index a7cc71222..1d54b8a98 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -111,7 +111,6 @@ func CreateAsyncApiClient(
                numOfWorkers,
                requests,
                duration,
-               retry,
                logger,
        )
        if err != nil {
diff --git a/plugins/helper/worker_scheduler.go 
b/plugins/helper/worker_scheduler.go
index e74f228f6..45be63c3f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -48,7 +48,6 @@ func NewWorkerScheduler(
        workerNum int,
        maxWork int,
        maxWorkDuration time.Duration,
-       maxRetry int,
        logger core.Logger,
 ) (*WorkerScheduler, errors.Error) {
        if maxWork <= 0 {
diff --git a/plugins/helper/worker_scheduler_test.go 
b/plugins/helper/worker_scheduler_test.go
index 22f055822..e95446596 100644
--- a/plugins/helper/worker_scheduler_test.go
+++ b/plugins/helper/worker_scheduler_test.go
@@ -31,7 +31,7 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
        // assuming we want 2 requests per second
        testChannel := make(chan int, 100)
        ctx, cancel := context.WithCancel(context.Background())
-       s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, 0, 
unithelper.DummyLogger())
+       s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, 
unithelper.DummyLogger())
        defer s.Release()
        for i := 1; i <= 5; i++ {
                t := i
diff --git a/services/blueprint.go b/services/blueprint.go
index 6c080ebdb..2c0cc7669 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -34,11 +34,13 @@ import (
        "gorm.io/gorm"
 )
 
-// BlueprintQuery FIXME ...
+// BlueprintQuery is a query for GetBlueprints
 type BlueprintQuery struct {
-       Enable   *bool `form:"enable,omitempty"`
-       Page     int   `form:"page"`
-       PageSize int   `form:"pageSize"`
+       Enable   *bool  `form:"enable,omitempty"`
+       IsManual *bool  `form:"is_manual"`
+       Page     int    `form:"page"`
+       PageSize int    `form:"pageSize"`
+       Label    string `form:"label"`
 }
 
 var (
@@ -52,11 +54,12 @@ func CreateBlueprint(blueprint *models.Blueprint) 
errors.Error {
        if err != nil {
                return err
        }
-       dbBlueprint, err := encryptDbBlueprint(parseDbBlueprint(blueprint))
+       dbBlueprint := parseDbBlueprint(blueprint)
+       dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
        if err != nil {
                return err
        }
-       err = CreateDbBlueprint(dbBlueprint)
+       err = SaveDbBlueprint(dbBlueprint)
        if err != nil {
                return err
        }
@@ -177,9 +180,14 @@ func PatchBlueprint(id uint64, body 
map[string]interface{}) (*models.Blueprint,
        }
 
        // save
-       err = save(blueprint)
+       dbBlueprint := parseDbBlueprint(blueprint)
+       dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error saving blueprint")
+               return nil, err
+       }
+       err = SaveDbBlueprint(dbBlueprint)
+       if err != nil {
+               return nil, err
        }
 
        // reload schedule
@@ -206,29 +214,29 @@ func DeleteBlueprint(id uint64) errors.Error {
 
 // ReloadBlueprints FIXME ...
 func ReloadBlueprints(c *cron.Cron) errors.Error {
-       dbBlueprints := make([]*models.DbBlueprint, 0)
-       if err := db.Model(&models.DbBlueprint{}).
-               Where("enable = ? AND is_manual = ?", true, false).
-               Find(&dbBlueprints).Error; err != nil {
-               return errors.Internal.Wrap(err, "error finding blueprints 
while reloading")
+       enable := true
+       isManual := false
+       dbBlueprints, _, err := GetDbBlueprints(&BlueprintQuery{Enable: 
&enable, IsManual: &isManual})
+       if err != nil {
+               return err
        }
        for _, e := range c.Entries() {
                c.Remove(e.ID)
        }
        c.Stop()
-       for _, pp := range dbBlueprints {
-               pp, err := decryptDbBlueprint(pp)
+       for _, dbBlueprint := range dbBlueprints {
+               dbBlueprint, err = decryptDbBlueprint(dbBlueprint)
                if err != nil {
                        return err
                }
-               blueprint := parseBlueprint(pp)
+               blueprint := parseBlueprint(dbBlueprint)
                plan, err := blueprint.UnmarshalPlan()
                if err != nil {
                        blueprintLog.Error(err, failToCreateCronJob)
                        return err
                }
                if _, err := c.AddFunc(blueprint.CronConfig, func() {
-                       pipeline, err := 
createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
+                       pipeline, err := createPipelineByBlueprint(blueprint, 
blueprint.Name, plan)
                        if err != nil {
                                blueprintLog.Error(err, "run cron job failed")
                        } else {
@@ -246,11 +254,12 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
        return nil
 }
 
-func createPipelineByBlueprint(blueprintId uint64, name string, plan 
core.PipelinePlan) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, name string, plan 
core.PipelinePlan) (*models.Pipeline, errors.Error) {
        newPipeline := models.NewPipeline{}
        newPipeline.Plan = plan
        newPipeline.Name = name
-       newPipeline.BlueprintId = blueprintId
+       newPipeline.BlueprintId = blueprint.ID
+       newPipeline.Labels = blueprint.Labels
        pipeline, err := CreatePipeline(&newPipeline)
        // Return all created tasks to the User
        if err != nil {
@@ -342,15 +351,8 @@ func TriggerBlueprint(id uint64) (*models.Pipeline, 
errors.Error) {
        if err != nil {
                return nil, err
        }
-       pipeline, err := createPipelineByBlueprint(blueprint.ID, 
blueprint.Name, plan)
+
+       pipeline, err := createPipelineByBlueprint(blueprint, blueprint.Name, 
plan)
        // done
        return pipeline, err
 }
-func save(blueprint *models.Blueprint) errors.Error {
-       dbBlueprint := parseDbBlueprint(blueprint)
-       dbBlueprint, err := encryptDbBlueprint(dbBlueprint)
-       if err != nil {
-               return err
-       }
-       return errors.Convert(db.Save(dbBlueprint).Error)
-}
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 8a56bb710..e8fad2b81 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -26,36 +26,76 @@ import (
        "gorm.io/gorm"
 )
 
-// CreateDbBlueprint accepts a Blueprint instance and insert it to database
-func CreateDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
-       err := db.Create(&dbBlueprint).Error
+// SaveDbBlueprint accepts a Blueprint instance and upsert it to database
+func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
+       var err error
+       if dbBlueprint.ID != 0 {
+               err = db.Save(&dbBlueprint).Error
+       } else {
+               err = db.Create(&dbBlueprint).Error
+       }
        if err != nil {
                return errors.Default.Wrap(err, "error creating DB blueprint")
        }
+       err = db.Delete(&models.DbBlueprintLabel{}, `blueprint_id = ?`, 
dbBlueprint.ID).Error
+       if err != nil {
+               return errors.Default.Wrap(err, "error delete DB blueprint's 
old labelModels")
+       }
+       if len(dbBlueprint.Labels) > 0 {
+               for i := range dbBlueprint.Labels {
+                       dbBlueprint.Labels[i].BlueprintId = dbBlueprint.ID
+               }
+               err = db.Create(&dbBlueprint.Labels).Error
+               if err != nil {
+                       return errors.Default.Wrap(err, "error creating DB 
blueprint's labelModels")
+               }
+       }
        return nil
 }
 
 // GetDbBlueprints returns a paginated list of Blueprints based on `query`
 func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, 
errors.Error) {
        dbBlueprints := make([]*models.DbBlueprint, 0)
-       db := db.Model(dbBlueprints).Order("id DESC")
+       dbQuery := db.Model(dbBlueprints).Order("id DESC")
        if query.Enable != nil {
-               db = db.Where("enable = ?", *query.Enable)
+               dbQuery = dbQuery.Where("enable = ?", *query.Enable)
+       }
+       if query.IsManual != nil {
+               dbQuery = dbQuery.Where("is_manual = ?", *query.IsManual)
+       }
+       if query.Label != "" {
+               dbQuery = dbQuery.
+                       Joins(`left join _devlake_blueprint_labels ON 
_devlake_blueprint_labels.blueprint_id = _devlake_blueprints.id`).
+                       Where(`_devlake_blueprint_labels.name = ?`, query.Label)
        }
 
        var count int64
-       err := db.Count(&count).Error
+       err := dbQuery.Count(&count).Error
        if err != nil {
                return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of blueprints")
        }
 
-       db = processDbClausesWithPager(db, query.PageSize, query.Page)
+       dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
 
-       err = db.Find(&dbBlueprints).Error
+       err = dbQuery.Find(&dbBlueprints).Error
        if err != nil {
                return nil, 0, errors.Default.Wrap(err, "error finding DB 
blueprints")
        }
 
+       var blueprintIds []uint64
+       for _, dbBlueprint := range dbBlueprints {
+               blueprintIds = append(blueprintIds, dbBlueprint.ID)
+       }
+       var dbLabels []models.DbBlueprintLabel
+       dbLabelsMap := map[uint64][]models.DbBlueprintLabel{}
+       db.Where(`blueprint_id in ?`, blueprintIds).Find(&dbLabels)
+       for _, dbLabel := range dbLabels {
+               dbLabelsMap[dbLabel.BlueprintId] = 
append(dbLabelsMap[dbLabel.BlueprintId], dbLabel)
+       }
+       for _, dbBlueprint := range dbBlueprints {
+               dbBlueprint.Labels = dbLabelsMap[dbBlueprint.ID]
+       }
+
        return dbBlueprints, count, nil
 }
 
@@ -69,6 +109,10 @@ func GetDbBlueprint(dbBlueprintId uint64) 
(*models.DbBlueprint, errors.Error) {
                }
                return nil, errors.Default.Wrap(err, "error getting blueprint 
from DB")
        }
+       err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?", 
dbBlueprint.ID).Error
+       if err != nil {
+               return nil, errors.Internal.Wrap(err, "error getting the 
blueprint from database")
+       }
        return dbBlueprint, nil
 }
 
@@ -82,17 +126,22 @@ func DeleteDbBlueprint(id uint64) errors.Error {
 }
 
 // parseBlueprint
-func parseBlueprint(DbBlueprint *models.DbBlueprint) *models.Blueprint {
+func parseBlueprint(dbBlueprint *models.DbBlueprint) *models.Blueprint {
+       labelList := []string{}
+       for _, labelModel := range dbBlueprint.Labels {
+               labelList = append(labelList, labelModel.Name)
+       }
        blueprint := models.Blueprint{
-               Name:       DbBlueprint.Name,
-               Mode:       DbBlueprint.Mode,
-               Plan:       []byte(DbBlueprint.Plan),
-               Enable:     DbBlueprint.Enable,
-               CronConfig: DbBlueprint.CronConfig,
-               IsManual:   DbBlueprint.IsManual,
-               SkipOnFail: DbBlueprint.SkipOnFail,
-               Settings:   []byte(DbBlueprint.Settings),
-               Model:      DbBlueprint.Model,
+               Name:       dbBlueprint.Name,
+               Mode:       dbBlueprint.Mode,
+               Plan:       []byte(dbBlueprint.Plan),
+               Enable:     dbBlueprint.Enable,
+               CronConfig: dbBlueprint.CronConfig,
+               IsManual:   dbBlueprint.IsManual,
+               SkipOnFail: dbBlueprint.SkipOnFail,
+               Settings:   []byte(dbBlueprint.Settings),
+               Model:      dbBlueprint.Model,
+               Labels:     labelList,
        }
        return &blueprint
 }
@@ -110,6 +159,14 @@ func parseDbBlueprint(blueprint *models.Blueprint) 
*models.DbBlueprint {
                Settings:   string(blueprint.Settings),
                Model:      blueprint.Model,
        }
+       dbBlueprint.Labels = []models.DbBlueprintLabel{}
+       for _, label := range blueprint.Labels {
+               dbBlueprint.Labels = append(dbBlueprint.Labels, 
models.DbBlueprintLabel{
+                       // NOTICE: BlueprintId may be nil
+                       BlueprintId: blueprint.ID,
+                       Name:        label,
+               })
+       }
        return &dbBlueprint
 }
 
diff --git a/services/pipeline.go b/services/pipeline.go
index 5bcbf3fd3..52a05d6f7 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -23,6 +23,7 @@ import (
        "os"
        "path/filepath"
        "strings"
+       "sync"
        "time"
 
        "github.com/apache/incubator-devlake/errors"
@@ -40,13 +41,14 @@ var notificationService *NotificationService
 var temporalClient client.Client
 var globalPipelineLog = logger.Global.Nested("pipeline service")
 
-// PipelineQuery FIXME ...
+// PipelineQuery is a query for GetPipelines
 type PipelineQuery struct {
        Status      string `form:"status"`
        Pending     int    `form:"pending"`
        Page        int    `form:"page"`
        PageSize    int    `form:"pageSize"`
        BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
+       Label       string `form:"label"`
 }
 
 func pipelineServiceInit() {
@@ -155,6 +157,8 @@ func GetPipelineLogsArchivePath(pipeline *models.Pipeline) 
(string, errors.Error
 // RunPipelineInQueue query pipeline from db and run it in a queue
 func RunPipelineInQueue(pipelineMaxParallel int64) {
        sema := semaphore.NewWeighted(pipelineMaxParallel)
+       runningParallelLabels := []string{}
+       var runningParallelLabelLock sync.Mutex
        for {
                globalPipelineLog.Info("acquire lock")
                // start goroutine when sema lock ready and pipeline exist.
@@ -167,27 +171,58 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
                dbPipeline := &models.DbPipeline{}
                for {
                        cronLocker.Lock()
+                       // prepare query to find an appropriate pipeline to 
execute
                        db.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN}).
+                               Joins(`left join _devlake_pipeline_labels ON
+                                               
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
+                                               _devlake_pipeline_labels.name 
LIKE 'parallel/%' AND
+                                               _devlake_pipeline_labels.name 
in ?`, runningParallelLabels).
+                               Group(`id`).
+                               
Having(`count(_devlake_pipeline_labels.name)=0`).
+                               Select("id").
                                Order("id ASC").Limit(1).Find(dbPipeline)
                        cronLocker.Unlock()
                        if dbPipeline.ID != 0 {
-                               db.Model(&models.DbPipeline{}).Where("id = ?", 
dbPipeline.ID).Updates(map[string]interface{}{
-                                       "status":   models.TASK_RUNNING,
-                                       "message":  "",
-                                       "began_at": time.Now(),
-                               })
                                break
                        }
                        time.Sleep(time.Second)
                }
-               go func(pipelineId uint64) {
+
+               db.Model(&models.DbPipeline{}).Where("id = ?", 
dbPipeline.ID).Updates(map[string]interface{}{
+                       "status":   models.TASK_RUNNING,
+                       "message":  "",
+                       "began_at": time.Now(),
+               })
+               dbPipeline, err = GetDbPipeline(dbPipeline.ID)
+               if err != nil {
+                       panic(err)
+               }
+
+               // add pipelineParallelLabels to runningParallelLabels
+               var pipelineParallelLabels []string
+               for _, dbLabel := range dbPipeline.Labels {
+                       if strings.HasPrefix(dbLabel.Name, `parallel/`) {
+                               pipelineParallelLabels = 
append(pipelineParallelLabels, dbLabel.Name)
+                       }
+               }
+               runningParallelLabelLock.Lock()
+               runningParallelLabels = append(runningParallelLabels, 
pipelineParallelLabels...)
+               runningParallelLabelLock.Unlock()
+
+               go func(pipelineId uint64, parallelLabels []string) {
                        defer sema.Release(1)
-                       globalPipelineLog.Info("run pipeline, %d", pipelineId)
+                       defer func() {
+                               runningParallelLabelLock.Lock()
+                               runningParallelLabels = 
utils.SliceRemove(runningParallelLabels, parallelLabels...)
+                               runningParallelLabelLock.Unlock()
+                               globalPipelineLog.Info("finish pipeline #%d, 
now runningParallelLabels is %s", pipelineId, runningParallelLabels)
+                       }()
+                       globalPipelineLog.Info("run pipeline, %d, now running 
runningParallelLabels are %s", pipelineId, runningParallelLabels)
                        err = runPipeline(pipelineId)
                        if err != nil {
                                globalPipelineLog.Error(err, "failed to run 
pipeline %d", pipelineId)
                        }
-               }(dbPipeline.ID)
+               }(dbPipeline.ID, pipelineParallelLabels)
        }
 }
 
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index a8e38dc18..2d59f802f 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -52,10 +52,25 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
        if err != nil {
                return nil, err
        }
+
        // save pipeline to database
        if err := db.Create(&dbPipeline).Error; err != nil {
-               globalPipelineLog.Error(err, "create pipline failed: %v", err)
-               return nil, errors.Internal.Wrap(err, "create pipline failed")
+               globalPipelineLog.Error(err, "create pipeline failed: %v", err)
+               return nil, errors.Internal.Wrap(err, "create pipeline failed")
+       }
+
+       dbPipeline.Labels = []models.DbPipelineLabel{}
+       for _, label := range newPipeline.Labels {
+               dbPipeline.Labels = append(dbPipeline.Labels, 
models.DbPipelineLabel{
+                       PipelineId: dbPipeline.ID,
+                       Name:       label,
+               })
+       }
+       if len(dbPipeline.Labels) > 0 {
+               if err := db.Create(&dbPipeline.Labels).Error; err != nil {
+                       globalPipelineLog.Error(err, "create pipeline's 
labelModels failed: %v", err)
+                       return nil, errors.Internal.Wrap(err, "create 
pipeline's labelModels failed")
+               }
        }
 
        // create tasks accordingly
@@ -99,28 +114,48 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
 // GetDbPipelines by query
 func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, 
errors.Error) {
        dbPipelines := make([]*models.DbPipeline, 0)
-       db := db.Model(dbPipelines).Order("id DESC")
+       dbQuery := db.Model(dbPipelines).Order("id DESC")
        if query.BlueprintId != 0 {
-               db = db.Where("blueprint_id = ?", query.BlueprintId)
+               dbQuery = dbQuery.Where("blueprint_id = ?", query.BlueprintId)
        }
        if query.Status != "" {
-               db = db.Where("status = ?", query.Status)
+               dbQuery = dbQuery.Where("status = ?", query.Status)
        }
        if query.Pending > 0 {
-               db = db.Where("finished_at is null and status != ?", 
"TASK_FAILED")
+               dbQuery = dbQuery.Where("finished_at is null and status != ?", 
"TASK_FAILED")
+       }
+       if query.Label != "" {
+               dbQuery = dbQuery.
+                       Joins(`left join _devlake_pipeline_labels ON 
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id`).
+                       Where(`_devlake_pipeline_labels.name = ?`, query.Label)
        }
        var count int64
-       err := db.Count(&count).Error
+       err := dbQuery.Count(&count).Error
        if err != nil {
                return nil, 0, errors.Default.Wrap(err, "error getting DB 
pipelines count")
        }
 
-       db = processDbClausesWithPager(db, query.PageSize, query.Page)
+       dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
 
-       err = db.Find(&dbPipelines).Error
+       err = dbQuery.Find(&dbPipelines).Error
        if err != nil {
                return nil, count, errors.Default.Wrap(err, "error finding DB 
pipelines")
        }
+
+       var pipelineIds []uint64
+       for _, dbPipeline := range dbPipelines {
+               pipelineIds = append(pipelineIds, dbPipeline.ID)
+       }
+       dbLabels := []models.DbPipelineLabel{}
+       db.Where(`pipeline_id in ?`, pipelineIds).Find(&dbLabels)
+       dbLabelsMap := map[uint64][]models.DbPipelineLabel{}
+       for _, dbLabel := range dbLabels {
+               dbLabelsMap[dbLabel.PipelineId] = 
append(dbLabelsMap[dbLabel.PipelineId], dbLabel)
+       }
+       for _, dbPipeline := range dbPipelines {
+               dbPipeline.Labels = dbLabelsMap[dbPipeline.ID]
+       }
+
        return dbPipelines, count, nil
 }
 
@@ -134,11 +169,19 @@ func GetDbPipeline(pipelineId uint64) 
(*models.DbPipeline, errors.Error) {
                }
                return nil, errors.Internal.Wrap(err, "error getting the 
pipeline from database")
        }
+       err = db.Find(&dbPipeline.Labels, "pipeline_id = ?", pipelineId).Error
+       if err != nil {
+               return nil, errors.Internal.Wrap(err, "error getting the 
pipeline from database")
+       }
        return dbPipeline, nil
 }
 
 // parsePipeline converts DbPipeline to Pipeline
 func parsePipeline(dbPipeline *models.DbPipeline) *models.Pipeline {
+       labelList := []string{}
+       for _, labelModel := range dbPipeline.Labels {
+               labelList = append(labelList, labelModel.Name)
+       }
        pipeline := models.Pipeline{
                Model:         dbPipeline.Model,
                Name:          dbPipeline.Name,
@@ -152,11 +195,13 @@ func parsePipeline(dbPipeline *models.DbPipeline) 
*models.Pipeline {
                Message:       dbPipeline.Message,
                SpentSeconds:  dbPipeline.SpentSeconds,
                Stage:         dbPipeline.Stage,
+               Labels:        labelList,
        }
        return &pipeline
 }
 
 // parseDbPipeline converts Pipeline to DbPipeline
+// nolint:unused
 func parseDbPipeline(pipeline *models.Pipeline) *models.DbPipeline {
        dbPipeline := models.DbPipeline{
                Model:         pipeline.Model,
@@ -172,6 +217,14 @@ func parseDbPipeline(pipeline *models.Pipeline) 
*models.DbPipeline {
                SpentSeconds:  pipeline.SpentSeconds,
                Stage:         pipeline.Stage,
        }
+       dbPipeline.Labels = []models.DbPipelineLabel{}
+       for _, label := range pipeline.Labels {
+               dbPipeline.Labels = append(dbPipeline.Labels, 
models.DbPipelineLabel{
+                       // NOTICE: PipelineId may be nil
+                       PipelineId: pipeline.ID,
+                       Name:       label,
+               })
+       }
        return &dbPipeline
 }
 
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 0ec968704..8eea52ebb 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -117,27 +117,28 @@ func runPipeline(pipelineId uint64) errors.Error {
        if err != nil {
                err = errors.Default.Wrap(err, fmt.Sprintf("Error running 
pipeline %d.", pipelineId))
        }
-       pipeline, e := GetPipeline(pipelineId)
+       dbPipeline, e := GetDbPipeline(pipelineId)
        if e != nil {
                return errors.Default.Wrap(err, fmt.Sprintf("Unable to get 
pipeline %d.", pipelineId))
        }
        // finished, update database
        finishedAt := time.Now()
-       pipeline.FinishedAt = &finishedAt
-       pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
+       dbPipeline.FinishedAt = &finishedAt
+       if dbPipeline.BeganAt != nil {
+               dbPipeline.SpentSeconds = int(finishedAt.Unix() - 
dbPipeline.BeganAt.Unix())
+       }
        if err != nil {
-               pipeline.Status = models.TASK_FAILED
+               dbPipeline.Status = models.TASK_FAILED
                if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
-                       pipeline.Message = lakeErr.Messages().Format()
+                       dbPipeline.Message = lakeErr.Messages().Format()
                } else {
-                       pipeline.Message = err.Error()
+                       dbPipeline.Message = err.Error()
                }
        } else {
-               pipeline.Status = models.TASK_COMPLETED
-               pipeline.Message = ""
+               dbPipeline.Status = models.TASK_COMPLETED
+               dbPipeline.Message = ""
        }
-       dbPipeline := parseDbPipeline(pipeline)
-       dbe := db.Model(dbPipeline).Select("finished_at", "spent_seconds", 
"status", "message").Updates(dbPipeline).Error
+       dbe := db.Model(dbPipeline).Updates(dbPipeline).Error
        if dbe != nil {
                globalPipelineLog.Error(dbe, "update pipeline state failed")
                return errors.Convert(dbe)
diff --git a/utils/slice.go b/utils/slice.go
new file mode 100644
index 000000000..659a21078
--- /dev/null
+++ b/utils/slice.go
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+// SliceRemove remove some items in old slice
+func SliceRemove[T ~int | ~string](source []T, toRemoves ...T) []T {
+       j := 0
+       for _, v := range source {
+               needRemove := false
+               for _, toRemove := range toRemoves {
+                       if v == toRemove {
+                               needRemove = true
+                               break
+                       }
+               }
+               if !needRemove {
+                       source[j] = v
+                       j++
+               }
+       }
+       return source[:j]
+}
diff --git a/utils/slice_test.go b/utils/slice_test.go
new file mode 100644
index 000000000..d55f8ae6e
--- /dev/null
+++ b/utils/slice_test.go
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "github.com/stretchr/testify/assert"
+       "testing"
+)
+
+// TestSliceRemove test the SliceRemove
+func TestSliceRemove(t *testing.T) {
+       assert.Equal(t, []int{3, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 1, 2))
+       assert.Equal(t, []int{1, 2, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 3, 
3))
+       assert.Equal(t, []string{`1`, `2`, `4`, `5`}, SliceRemove([]string{`1`, 
`2`, `3`, `4`, `5`}, `3`, `3`))
+}


Reply via email to