This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e5ee65ee0 feat: resume pipelines on restart (#7229)
e5ee65ee0 is described below

commit e5ee65ee0ea8b39e4c1edaa54851d4869c1f8a88
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Apr 1 15:34:27 2024 +0800

    feat: resume pipelines on restart (#7229)
    
    * feat: resume pipelines on restart
    
    * feat: gitextractor clone phase must be executed on resume
    
    * fix: subtasks always get skipped
---
 backend/core/config/config_viper.go         |  1 +
 backend/core/models/task.go                 |  1 +
 backend/core/plugin/plugin_task.go          |  1 +
 backend/core/runner/run_pipeline.go         |  5 ++-
 backend/core/runner/run_task.go             | 47 +++++++++++++++--------
 backend/plugins/gitextractor/tasks/clone.go |  1 +
 backend/server/services/pipeline.go         | 58 +++++++++++++++--------------
 7 files changed, 69 insertions(+), 45 deletions(-)

diff --git a/backend/core/config/config_viper.go 
b/backend/core/config/config_viper.go
index 3e4e4b0c8..ea5ed6da9 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -104,6 +104,7 @@ func setDefaultValue(v *viper.Viper) {
        v.SetDefault("PLUGIN_DIR", "bin/plugins")
        v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins")
        v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
+       v.SetDefault("RESUME_PIPELINES", true)
 }
 
 func init() {
diff --git a/backend/core/models/task.go b/backend/core/models/task.go
index fd1b7ffbf..661ba091a 100644
--- a/backend/core/models/task.go
+++ b/backend/core/models/task.go
@@ -26,6 +26,7 @@ import (
 const (
        TASK_CREATED   = "TASK_CREATED"
        TASK_RERUN     = "TASK_RERUN"
+       TASK_RESUME    = "TASK_RESUME"
        TASK_RUNNING   = "TASK_RUNNING"
        TASK_COMPLETED = "TASK_COMPLETED"
        TASK_FAILED    = "TASK_FAILED"
diff --git a/backend/core/plugin/plugin_task.go 
b/backend/core/plugin/plugin_task.go
index 357fa2dc4..136f4f17e 100644
--- a/backend/core/plugin/plugin_task.go
+++ b/backend/core/plugin/plugin_task.go
@@ -106,6 +106,7 @@ type SubTaskMeta struct {
        Dependencies     []*SubTaskMeta
        DependencyTables []string
        ProductTables    []string
+       ForceRunOnResume bool // Should a subtask be ran dispite it was 
finished before
 }
 
 // PluginTask Implement this interface to let framework run tasks for you
diff --git a/backend/core/runner/run_pipeline.go 
b/backend/core/runner/run_pipeline.go
index 24a07453b..7df1d994d 100644
--- a/backend/core/runner/run_pipeline.go
+++ b/backend/core/runner/run_pipeline.go
@@ -19,11 +19,12 @@ package runner
 
 import (
        gocontext "context"
+       "time"
+
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
-       "time"
 )
 
 // RunPipeline FIXME ...
@@ -37,7 +38,7 @@ func RunPipeline(
        var tasks []models.Task
        err := db.All(
                &tasks,
-               dal.Where("pipeline_id = ? AND status in ?", pipelineId, 
[]string{models.TASK_CREATED, models.TASK_RERUN}),
+               dal.Where("pipeline_id = ? AND status in ?", pipelineId, 
[]string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
                dal.Orderby("pipeline_row, pipeline_col"),
        )
        if err != nil {
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index 7dbbe5549..0b4e0604c 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -47,9 +47,6 @@ func RunTask(
        if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
                return err
        }
-       if task.Status == models.TASK_COMPLETED {
-               return errors.Default.New("invalid task status")
-       }
        dbPipeline := &models.Pipeline{}
        if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); 
err != nil {
                return err
@@ -60,6 +57,9 @@ func RunTask(
                return err
        }
        beganAt := time.Now()
+       if dbPipeline.BeganAt != nil {
+               beganAt = *dbPipeline.BeganAt
+       }
        // make sure task status always correct even if it panicked
        defer func() {
                if r := recover(); r != nil {
@@ -119,6 +119,10 @@ func RunTask(
                }
        }()
 
+       if task.Status == models.TASK_COMPLETED {
+               return nil
+       }
+
        // start execution
        logger.Info("start executing task: %d", task.ID)
        dbe := db.UpdateColumns(task, []dal.DalSet{
@@ -298,7 +302,6 @@ func RunPluginSubTasks(
                        continue
                }
                // run subtask
-               logger.Info("executing subtask %s", subtaskMeta.Name)
                subtaskNumber++
                if progress != nil {
                        progress <- plugin.RunningProgress{
@@ -307,18 +310,32 @@ func RunPluginSubTasks(
                                SubTaskNumber: subtaskNumber,
                        }
                }
-               err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, 
subtaskMeta.EntryPoint)
-               if err != nil {
-                       err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask 
%s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
-                       logger.Error(err, "")
-                       where := dal.Where("task_id = ? and name = ?", task.ID, 
subtaskCtx.GetName())
-                       if err := basicRes.GetDal().UpdateColumns(subtask, 
[]dal.DalSet{
-                               {ColumnName: "is_failed", Value: 1},
-                               {ColumnName: "message", Value: err.Error()},
-                       }, where); err != nil {
-                               basicRes.GetLogger().Error(err, "error writing 
subtask %v status to DB", subtaskCtx.GetName())
+               subtaskFinsied := false
+               if !subtaskMeta.ForceRunOnResume {
+                       sfc := errors.Must1(
+                               basicRes.GetDal().Count(
+                                       dal.From(&models.Subtask{}), 
dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, 
subtaskMeta.Name),
+                               ),
+                       )
+                       subtaskFinsied = sfc > 0
+               }
+               if subtaskFinsied {
+                       logger.Info("subtask %s already finished previously", 
subtaskMeta.Name)
+               } else {
+                       logger.Info("executing subtask %s", subtaskMeta.Name)
+                       err = runSubtask(basicRes, subtaskCtx, task.ID, 
subtaskNumber, subtaskMeta.EntryPoint)
+                       if err != nil {
+                               err = errors.SubtaskErr.Wrap(err, 
fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), 
errors.WithData(&subtaskMeta))
+                               logger.Error(err, "")
+                               where := dal.Where("task_id = ? and name = ?", 
task.ID, subtaskCtx.GetName())
+                               if err := 
basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
+                                       {ColumnName: "is_failed", Value: 1},
+                                       {ColumnName: "message", Value: 
err.Error()},
+                               }, where); err != nil {
+                                       basicRes.GetLogger().Error(err, "error 
writing subtask %v status to DB", subtaskCtx.GetName())
+                               }
+                               return err
                        }
-                       return err
                }
                taskCtx.IncProgress(1)
        }
diff --git a/backend/plugins/gitextractor/tasks/clone.go 
b/backend/plugins/gitextractor/tasks/clone.go
index c27921a63..599d4adb6 100644
--- a/backend/plugins/gitextractor/tasks/clone.go
+++ b/backend/plugins/gitextractor/tasks/clone.go
@@ -38,6 +38,7 @@ var CloneGitRepoMeta = plugin.SubTaskMeta{
        Required:         true,
        Description:      "clone a git repo, make it available to later tasks",
        DomainTypes:      []string{plugin.DOMAIN_TYPE_CODE},
+       ForceRunOnResume: true,
 }
 
 func useGoGit(subTaskCtx plugin.SubTaskContext, taskData 
*GitExtractorTaskData) bool {
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 2a1ac8ccb..80ce47e4e 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -79,34 +79,14 @@ func pipelineServiceInit() {
        }
 
        // standalone mode: reset pipeline status
-       errMsg := "The process was terminated unexpectedly"
-       err := db.UpdateColumns(
-               &models.Pipeline{},
-               []dal.DalSet{
-                       {ColumnName: "status", Value: models.TASK_FAILED},
-                       {ColumnName: "message", Value: errMsg},
-               },
-               dal.Where("status = ?", models.TASK_RUNNING),
-       )
-       if err != nil {
-               panic(err)
-       }
-       err = db.UpdateColumns(
-               &models.Task{},
-               []dal.DalSet{
-                       {ColumnName: "status", Value: models.TASK_FAILED},
-                       {ColumnName: "message", Value: errMsg},
-               },
-               dal.Where("status = ?", models.TASK_RUNNING),
-       )
-       if err != nil {
-               panic(err)
+       if cfg.GetBool("RESUME_PIPELINES") {
+               markInterruptedPipelineAs(models.TASK_RESUME)
+       } else {
+               markInterruptedPipelineAs(models.TASK_FAILED)
        }
 
-       err = ReloadBlueprints()
-       if err != nil {
-               panic(err)
-       }
+       // load cronjobs for blueprints
+       errors.Must(ReloadBlueprints())
 
        var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
        if pipelineMaxParallel < 0 {
@@ -120,6 +100,23 @@ func pipelineServiceInit() {
        go RunPipelineInQueue(pipelineMaxParallel)
 }
 
+func markInterruptedPipelineAs(status string) {
+       errors.Must(db.UpdateColumns(
+               &models.Pipeline{},
+               []dal.DalSet{
+                       {ColumnName: "status", Value: status},
+               },
+               dal.Where("status = ?", models.TASK_RUNNING),
+       ))
+       errors.Must(db.UpdateColumns(
+               &models.Task{},
+               []dal.DalSet{
+                       {ColumnName: "status", Value: status},
+               },
+               dal.Where("status = ?", models.TASK_RUNNING),
+       ))
+}
+
 // CreatePipeline and return the model
 func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) 
(*models.Pipeline, errors.Error) {
        pipeline, err := CreateDbPipeline(newPipeline)
@@ -238,7 +235,7 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
        // prepare query to find an appropriate pipeline to execute
        pipeline = &models.Pipeline{}
        err = tx.First(pipeline,
-               dal.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN}),
+               dal.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN, models.TASK_RESUME}),
                dal.Join(
                        `left join _devlake_pipeline_labels ON
                                _devlake_pipeline_labels.pipeline_id = 
_devlake_pipelines.id AND
@@ -254,11 +251,16 @@ func dequeuePipeline(runningParallelLabels []string) 
(pipeline *models.Pipeline,
        )
        if err == nil {
                // mark the pipeline running, now we want a write lock
+               if pipeline.BeganAt == nil {
+                       now := time.Now()
+                       pipeline.BeganAt = &now
+                       globalPipelineLog.Info("resumed pipeline #%d", 
pipeline.ID)
+               }
                errors.Must(tx.LockTables(dal.LockTables{{Table: 
"_devlake_pipelines", Exclusive: true}}))
                err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
                        {ColumnName: "status", Value: models.TASK_RUNNING},
                        {ColumnName: "message", Value: ""},
-                       {ColumnName: "began_at", Value: time.Now()},
+                       {ColumnName: "began_at", Value: pipeline.BeganAt},
                }, dal.Where("id = ?", pipeline.ID))
                if err != nil {
                        panic(err)

Reply via email to