This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 0467a4db4 fix: add some log; wait when query fail in
RunPipelineInQueue (#4551)
0467a4db4 is described below
commit 0467a4db41b71ac5cc385e1b555a0503f2411408
Author: Likyh <[email protected]>
AuthorDate: Wed Mar 1 17:33:02 2023 +0800
fix: add some log; wait when query fail in RunPipelineInQueue (#4551)
* fix: add some log; wait when query fail in RunPipelineInQueue
* fix: append
---
backend/core/models/blueprint.go | 2 +-
backend/plugins/webhook/api/blueprint_v200.go | 2 +-
backend/server/services/pipeline.go | 10 ++++++----
3 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index dec9ac119..2f2643c98 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -60,7 +60,7 @@ func (bp *Blueprint) UnmarshalPlan() (plugin.PipelinePlan,
errors.Error) {
var plan plugin.PipelinePlan
err := errors.Convert(json.Unmarshal(bp.Plan, &plan))
if err != nil {
- return nil, errors.Convert(err)
+ return nil, errors.Default.Wrap(err, `unmarshal plan fail`)
}
return plan, nil
}
diff --git a/backend/plugins/webhook/api/blueprint_v200.go
b/backend/plugins/webhook/api/blueprint_v200.go
index f169fe620..b415d4fca 100644
--- a/backend/plugins/webhook/api/blueprint_v200.go
+++ b/backend/plugins/webhook/api/blueprint_v200.go
@@ -31,7 +31,7 @@ func MakeDataSourcePipelinePlanV200(connectionId uint64)
(plugin.PipelinePlan, [
connection := &models.WebhookConnection{}
err := connectionHelper.FirstById(connection, connectionId)
if err != nil {
- return nil, nil, err
+ return nil, nil, errors.Default.Wrap(err, `cannot find webhook
connection`)
}
scopes := make([]plugin.Scope, 0)
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 07be063eb..93ce40145 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -191,13 +191,15 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
dal.Orderby("id ASC"),
dal.Limit(1),
)
- if err != nil && !db.IsErrorNotFound(err) {
- globalPipelineLog.Error(err, "dequeue failed")
- }
cronLocker.Unlock()
- if !db.IsErrorNotFound(err) {
+ if err == nil {
+ // next pipeline found
break
}
+ if !db.IsErrorNotFound(err) {
+ // log unexpected err
+ globalPipelineLog.Error(err, "dequeue failed")
+ }
time.Sleep(time.Second)
}