This is an automated email from the ASF dual-hosted git repository.

mappjzc pushed a commit to branch release-v0.16
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.16 by this push:
     new 9b353a68d fix: cherry pick #4766 (#4768)
9b353a68d is described below

commit 9b353a68d7c58c6debbdf0e56f9d07bd91cc9fde
Author: mappjzc <[email protected]>
AuthorDate: Fri Mar 24 21:01:45 2023 +0800

    fix: cherry pick #4766 (#4768)
    
    Cherry-pick the pr #4766 to v0.16
    
    Nddtfjiang <[email protected]>
---
 backend/server/api/blueprints/blueprints.go |  9 +++------
 backend/server/services/blueprint.go        |  7 ++++---
 backend/server/services/pipeline_helper.go  | 29 +++++++++++++++++++++++------
 3 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/backend/server/api/blueprints/blueprints.go 
b/backend/server/api/blueprints/blueprints.go
index e25b2cf0e..9810ba585 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -18,12 +18,13 @@ limitations under the License.
 package blueprints
 
 import (
+       "net/http"
+       "strconv"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/server/api/shared"
        "github.com/apache/incubator-devlake/server/services"
-       "net/http"
-       "strconv"
 
        "github.com/gin-gonic/gin"
 )
@@ -179,10 +180,6 @@ func Trigger(c *gin.Context) {
                return
        }
        pipeline, err := services.TriggerBlueprint(id)
-       if errors.Is(err, services.ErrBlueprintRunning) {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "the 
blueprint is running"))
-               return
-       }
        if err != nil {
                shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
triggering blueprint"))
                return
diff --git a/backend/server/services/blueprint.go 
b/backend/server/services/blueprint.go
index 504853b33..17819f133 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -223,9 +223,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
                if _, err := c.AddFunc(blueprint.CronConfig, func() {
                        pipeline, err := createPipelineByBlueprint(blueprint)
                        if err != nil {
-                               blueprintLog.Error(err, "run cron job failed")
+                               blueprintLog.Error(err, fmt.Sprintf("run cron 
job failed on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
                        } else {
-                               blueprintLog.Info("Run new cron job 
successfully, pipeline id: %d", pipeline.ID)
+                               blueprintLog.Info("Run new cron job 
successfully,pipeline id:[%d] pipeline id:[%d]", blueprint.ID, pipeline.ID)
                        }
                }); err != nil {
                        blueprintLog.Error(err, failToCreateCronJob)
@@ -248,6 +248,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) 
(*models.Pipeline, e
                plan, err = blueprint.UnmarshalPlan()
        }
        if err != nil {
+               blueprintLog.Error(err, fmt.Sprintf("failed to 
MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
                return nil, err
        }
        newPipeline := models.NewPipeline{}
@@ -259,7 +260,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) 
(*models.Pipeline, e
        pipeline, err := CreatePipeline(&newPipeline)
        // Return all created tasks to the User
        if err != nil {
-               blueprintLog.Error(err, failToCreateCronJob)
+               blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", 
failToCreateCronJob, blueprint.ID, blueprint.Name))
                return nil, errors.Convert(err)
        }
        return pipeline, nil
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index 03f3e322b..c0dd11cb6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,28 +20,45 @@ package services
 import (
        "encoding/json"
        "fmt"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
 )
 
-// ErrBlueprintRunning indicates there is a running pipeline with the 
specified blueprint_id
-var ErrBlueprintRunning = errors.Default.New("the blueprint is running")
-
 // CreateDbPipeline returns a NewPipeline
 func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline, 
errors.Error) {
        cronLocker.Lock()
        defer cronLocker.Unlock()
        if newPipeline.BlueprintId > 0 {
-               count, err := db.Count(
+               clauses := []dal.Clause{
                        dal.From(&models.Pipeline{}),
                        dal.Where("blueprint_id = ? AND status IN ?", 
newPipeline.BlueprintId, models.PendingTaskStatus),
-               )
+               }
+               count, err := db.Count(clauses...)
                if err != nil {
                        return nil, errors.Default.Wrap(err, "query pipelines 
error")
                }
+               // some pipeline is ruunning , get the detail and output them.
                if count > 0 {
-                       return nil, ErrBlueprintRunning
+                       cursor, err := db.Cursor(clauses...)
+                       if err != nil {
+                               return nil, errors.Default.Wrap(err, 
fmt.Sprintf("query pipelines error but count it success. count:%d", count))
+                       }
+                       defer cursor.Close()
+                       fetched := 0
+                       errstr := ""
+                       for cursor.Next() {
+                               pipeline := &models.Pipeline{}
+                               err = db.Fetch(cursor, pipeline)
+                               if err != nil {
+                                       return nil, errors.Default.Wrap(err, 
fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched, 
count))
+                               }
+                               fetched++
+
+                               errstr += fmt.Sprintf("pipeline:[%d] on 
state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
+                       }
+                       return nil, errors.Default.New(fmt.Sprintf("the 
blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
                }
        }
        planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))

Reply via email to