This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new 03efa422 fix: cancel pipeline doesn't work for pending item (#3852)
03efa422 is described below

commit 03efa422ad4b35626e9e7da811afef7114b0d1cc
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Dec 5 14:44:52 2022 +0800

    fix: cancel pipeline doesn't work for pending item (#3852)
---
 models/task.go       |  1 +
 services/pipeline.go | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/models/task.go b/models/task.go
index 12005763..090c8b66 100644
--- a/models/task.go
+++ b/models/task.go
@@ -30,6 +30,7 @@ const (
        TASK_RUNNING   = "TASK_RUNNING"
        TASK_COMPLETED = "TASK_COMPLETED"
        TASK_FAILED    = "TASK_FAILED"
+       TASK_CANCELLED = "TASK_CANCELLED"
 )
 
 type TaskProgressDetail struct {
diff --git a/services/pipeline.go b/services/pipeline.go
index acee2b29..3c7297f9 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -311,6 +311,31 @@ func NotifyExternal(pipelineId uint64) errors.Error {
 
 // CancelPipeline FIXME ...
 func CancelPipeline(pipelineId uint64) errors.Error {
+       // prevent RunPipelineInQueue from consuming pending pipelines
+       cronLocker.Lock()
+       defer cronLocker.Unlock()
+       pipeline := &models.DbPipeline{}
+       err := db.First(pipeline, pipelineId).Error
+       if err != nil {
+               return errors.BadInput.New("pipeline not found")
+       }
+       if pipeline.Status == models.TASK_CREATED {
+               pipeline.Status = models.TASK_CANCELLED
+               result := db.Save(pipeline)
+               if result.Error != nil {
+                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline")
+               }
+               // now, with RunPipelineInQueue being block and target pipeline 
got updated
+               // we should update the related tasks as well
+               result = db.Model(&models.Task{}).
+                       Where("pipeline_id = ?", pipelineId).
+                       Update("status", models.TASK_CANCELLED)
+               if result.Error != nil {
+                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline tasks")
+               }
+               // the target pipeline is pending, no running, no need to 
perform the actual cancel operation
+               return nil
+       }
        if temporalClient != nil {
                return 
errors.Convert(temporalClient.CancelWorkflow(context.Background(), 
getTemporalWorkflowId(pipelineId), ""))
        }

Reply via email to