This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 0467a4db4 fix: add some log; wait when query fail in 
RunPipelineInQueue (#4551)
0467a4db4 is described below

commit 0467a4db41b71ac5cc385e1b555a0503f2411408
Author: Likyh <[email protected]>
AuthorDate: Wed Mar 1 17:33:02 2023 +0800

    fix: add some log; wait when query fail in RunPipelineInQueue (#4551)
    
    * fix: add some log; wait when query fail in RunPipelineInQueue
    
    * fix: append
---
 backend/core/models/blueprint.go              |  2 +-
 backend/plugins/webhook/api/blueprint_v200.go |  2 +-
 backend/server/services/pipeline.go           | 10 ++++++----
 3 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index dec9ac119..2f2643c98 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -60,7 +60,7 @@ func (bp *Blueprint) UnmarshalPlan() (plugin.PipelinePlan, 
errors.Error) {
        var plan plugin.PipelinePlan
        err := errors.Convert(json.Unmarshal(bp.Plan, &plan))
        if err != nil {
-               return nil, errors.Convert(err)
+               return nil, errors.Default.Wrap(err, `unmarshal plan fail`)
        }
        return plan, nil
 }
diff --git a/backend/plugins/webhook/api/blueprint_v200.go 
b/backend/plugins/webhook/api/blueprint_v200.go
index f169fe620..b415d4fca 100644
--- a/backend/plugins/webhook/api/blueprint_v200.go
+++ b/backend/plugins/webhook/api/blueprint_v200.go
@@ -31,7 +31,7 @@ func MakeDataSourcePipelinePlanV200(connectionId uint64) 
(plugin.PipelinePlan, [
        connection := &models.WebhookConnection{}
        err := connectionHelper.FirstById(connection, connectionId)
        if err != nil {
-               return nil, nil, err
+               return nil, nil, errors.Default.Wrap(err, `cannot find webhook 
connection`)
        }
 
        scopes := make([]plugin.Scope, 0)
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 07be063eb..93ce40145 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -191,13 +191,15 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
                                dal.Orderby("id ASC"),
                                dal.Limit(1),
                        )
-                       if err != nil && !db.IsErrorNotFound(err) {
-                               globalPipelineLog.Error(err, "dequeue failed")
-                       }
                        cronLocker.Unlock()
-                       if !db.IsErrorNotFound(err) {
+                       if err == nil {
+                               // next pipeline found
                                break
                        }
+                       if !db.IsErrorNotFound(err) {
+                               // log unexpected err
+                               globalPipelineLog.Error(err, "dequeue failed")
+                       }
                        time.Sleep(time.Second)
                }
 

Reply via email to