This is an automated email from the ASF dual-hosted git repository.
mappjzc pushed a commit to branch release-v0.16
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.16 by this push:
new 9b353a68d fix: cherry pick #4766 (#4768)
9b353a68d is described below
commit 9b353a68d7c58c6debbdf0e56f9d07bd91cc9fde
Author: mappjzc <[email protected]>
AuthorDate: Fri Mar 24 21:01:45 2023 +0800
fix: cherry pick #4766 (#4768)
Cherry-pick the pr #4766 to v0.16
Nddtfjiang <[email protected]>
---
backend/server/api/blueprints/blueprints.go | 9 +++------
backend/server/services/blueprint.go | 7 ++++---
backend/server/services/pipeline_helper.go | 29 +++++++++++++++++++++++------
3 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/backend/server/api/blueprints/blueprints.go
b/backend/server/api/blueprints/blueprints.go
index e25b2cf0e..9810ba585 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -18,12 +18,13 @@ limitations under the License.
package blueprints
import (
+ "net/http"
+ "strconv"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/server/api/shared"
"github.com/apache/incubator-devlake/server/services"
- "net/http"
- "strconv"
"github.com/gin-gonic/gin"
)
@@ -179,10 +180,6 @@ func Trigger(c *gin.Context) {
return
}
pipeline, err := services.TriggerBlueprint(id)
- if errors.Is(err, services.ErrBlueprintRunning) {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "the
blueprint is running"))
- return
- }
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
triggering blueprint"))
return
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index 504853b33..17819f133 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -223,9 +223,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
if _, err := c.AddFunc(blueprint.CronConfig, func() {
pipeline, err := createPipelineByBlueprint(blueprint)
if err != nil {
- blueprintLog.Error(err, "run cron job failed")
+ blueprintLog.Error(err, fmt.Sprintf("run cron
job failed on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
} else {
- blueprintLog.Info("Run new cron job
successfully, pipeline id: %d", pipeline.ID)
+ blueprintLog.Info("Run new cron job
successfully,pipeline id:[%d] pipeline id:[%d]", blueprint.ID, pipeline.ID)
}
}); err != nil {
blueprintLog.Error(err, failToCreateCronJob)
@@ -248,6 +248,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint)
(*models.Pipeline, e
plan, err = blueprint.UnmarshalPlan()
}
if err != nil {
+ blueprintLog.Error(err, fmt.Sprintf("failed to
MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
return nil, err
}
newPipeline := models.NewPipeline{}
@@ -259,7 +260,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint)
(*models.Pipeline, e
pipeline, err := CreatePipeline(&newPipeline)
// Return all created tasks to the User
if err != nil {
- blueprintLog.Error(err, failToCreateCronJob)
+ blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]",
failToCreateCronJob, blueprint.ID, blueprint.Name))
return nil, errors.Convert(err)
}
return pipeline, nil
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 03f3e322b..c0dd11cb6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,28 +20,45 @@ package services
import (
"encoding/json"
"fmt"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
)
-// ErrBlueprintRunning indicates there is a running pipeline with the
specified blueprint_id
-var ErrBlueprintRunning = errors.Default.New("the blueprint is running")
-
// CreateDbPipeline returns a NewPipeline
func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline,
errors.Error) {
cronLocker.Lock()
defer cronLocker.Unlock()
if newPipeline.BlueprintId > 0 {
- count, err := db.Count(
+ clauses := []dal.Clause{
dal.From(&models.Pipeline{}),
dal.Where("blueprint_id = ? AND status IN ?",
newPipeline.BlueprintId, models.PendingTaskStatus),
- )
+ }
+ count, err := db.Count(clauses...)
if err != nil {
return nil, errors.Default.Wrap(err, "query pipelines
error")
}
+ // some pipeline is ruunning , get the detail and output them.
if count > 0 {
- return nil, ErrBlueprintRunning
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, errors.Default.Wrap(err,
fmt.Sprintf("query pipelines error but count it success. count:%d", count))
+ }
+ defer cursor.Close()
+ fetched := 0
+ errstr := ""
+ for cursor.Next() {
+ pipeline := &models.Pipeline{}
+ err = db.Fetch(cursor, pipeline)
+ if err != nil {
+ return nil, errors.Default.Wrap(err,
fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched,
count))
+ }
+ fetched++
+
+ errstr += fmt.Sprintf("pipeline:[%d] on
state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
+ }
+ return nil, errors.Default.New(fmt.Sprintf("the
blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
}
}
planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))