This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new 03efa422 fix: cancel pipeline doesn't work for pending item (#3852)
03efa422 is described below
commit 03efa422ad4b35626e9e7da811afef7114b0d1cc
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Dec 5 14:44:52 2022 +0800
fix: cancel pipeline doesn't work for pending item (#3852)
---
models/task.go | 1 +
services/pipeline.go | 25 +++++++++++++++++++++++++
2 files changed, 26 insertions(+)
diff --git a/models/task.go b/models/task.go
index 12005763..090c8b66 100644
--- a/models/task.go
+++ b/models/task.go
@@ -30,6 +30,7 @@ const (
TASK_RUNNING = "TASK_RUNNING"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
+ TASK_CANCELLED = "TASK_CANCELLED"
)
type TaskProgressDetail struct {
diff --git a/services/pipeline.go b/services/pipeline.go
index acee2b29..3c7297f9 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -311,6 +311,31 @@ func NotifyExternal(pipelineId uint64) errors.Error {
// CancelPipeline FIXME ...
func CancelPipeline(pipelineId uint64) errors.Error {
+ // prevent RunPipelineInQueue from consuming pending pipelines
+ cronLocker.Lock()
+ defer cronLocker.Unlock()
+ pipeline := &models.DbPipeline{}
+ err := db.First(pipeline, pipelineId).Error
+ if err != nil {
+ return errors.BadInput.New("pipeline not found")
+ }
+ if pipeline.Status == models.TASK_CREATED {
+ pipeline.Status = models.TASK_CANCELLED
+ result := db.Save(pipeline)
+ if result.Error != nil {
+ return errors.Default.Wrap(result.Error, "faile to
update pipeline")
+ }
+ // now, with RunPipelineInQueue being block and target pipeline
got updated
+ // we should update the related tasks as well
+ result = db.Model(&models.Task{}).
+ Where("pipeline_id = ?", pipelineId).
+ Update("status", models.TASK_CANCELLED)
+ if result.Error != nil {
+ return errors.Default.Wrap(result.Error, "faile to
update pipeline tasks")
+ }
+ // the target pipeline is pending, no running, no need to
perform the actual cancel operation
+ return nil
+ }
if temporalClient != nil {
return
errors.Convert(temporalClient.CancelWorkflow(context.Background(),
getTemporalWorkflowId(pipelineId), ""))
}