This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e5ee65ee0 feat: resume pipelines on restart (#7229)
e5ee65ee0 is described below
commit e5ee65ee0ea8b39e4c1edaa54851d4869c1f8a88
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Apr 1 15:34:27 2024 +0800
feat: resume pipelines on restart (#7229)
* feat: resume pipelines on restart
* feat: gitextractor clone phase must be executed on resume
* fix: subtasks always get skipped
---
backend/core/config/config_viper.go | 1 +
backend/core/models/task.go | 1 +
backend/core/plugin/plugin_task.go | 1 +
backend/core/runner/run_pipeline.go | 5 ++-
backend/core/runner/run_task.go | 47 +++++++++++++++--------
backend/plugins/gitextractor/tasks/clone.go | 1 +
backend/server/services/pipeline.go | 58 +++++++++++++++--------------
7 files changed, 69 insertions(+), 45 deletions(-)
diff --git a/backend/core/config/config_viper.go
b/backend/core/config/config_viper.go
index 3e4e4b0c8..ea5ed6da9 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -104,6 +104,7 @@ func setDefaultValue(v *viper.Viper) {
v.SetDefault("PLUGIN_DIR", "bin/plugins")
v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins")
v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
+ v.SetDefault("RESUME_PIPELINES", true)
}
func init() {
diff --git a/backend/core/models/task.go b/backend/core/models/task.go
index fd1b7ffbf..661ba091a 100644
--- a/backend/core/models/task.go
+++ b/backend/core/models/task.go
@@ -26,6 +26,7 @@ import (
const (
TASK_CREATED = "TASK_CREATED"
TASK_RERUN = "TASK_RERUN"
+ TASK_RESUME = "TASK_RESUME"
TASK_RUNNING = "TASK_RUNNING"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
diff --git a/backend/core/plugin/plugin_task.go
b/backend/core/plugin/plugin_task.go
index 357fa2dc4..136f4f17e 100644
--- a/backend/core/plugin/plugin_task.go
+++ b/backend/core/plugin/plugin_task.go
@@ -106,6 +106,7 @@ type SubTaskMeta struct {
Dependencies []*SubTaskMeta
DependencyTables []string
ProductTables []string
+ ForceRunOnResume bool // Should a subtask be ran dispite it was
finished before
}
// PluginTask Implement this interface to let framework run tasks for you
diff --git a/backend/core/runner/run_pipeline.go
b/backend/core/runner/run_pipeline.go
index 24a07453b..7df1d994d 100644
--- a/backend/core/runner/run_pipeline.go
+++ b/backend/core/runner/run_pipeline.go
@@ -19,11 +19,12 @@ package runner
import (
gocontext "context"
+ "time"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
- "time"
)
// RunPipeline FIXME ...
@@ -37,7 +38,7 @@ func RunPipeline(
var tasks []models.Task
err := db.All(
&tasks,
- dal.Where("pipeline_id = ? AND status in ?", pipelineId,
[]string{models.TASK_CREATED, models.TASK_RERUN}),
+ dal.Where("pipeline_id = ? AND status in ?", pipelineId,
[]string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
dal.Orderby("pipeline_row, pipeline_col"),
)
if err != nil {
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index 7dbbe5549..0b4e0604c 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -47,9 +47,6 @@ func RunTask(
if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
return err
}
- if task.Status == models.TASK_COMPLETED {
- return errors.Default.New("invalid task status")
- }
dbPipeline := &models.Pipeline{}
if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId));
err != nil {
return err
@@ -60,6 +57,9 @@ func RunTask(
return err
}
beganAt := time.Now()
+ if dbPipeline.BeganAt != nil {
+ beganAt = *dbPipeline.BeganAt
+ }
// make sure task status always correct even if it panicked
defer func() {
if r := recover(); r != nil {
@@ -119,6 +119,10 @@ func RunTask(
}
}()
+ if task.Status == models.TASK_COMPLETED {
+ return nil
+ }
+
// start execution
logger.Info("start executing task: %d", task.ID)
dbe := db.UpdateColumns(task, []dal.DalSet{
@@ -298,7 +302,6 @@ func RunPluginSubTasks(
continue
}
// run subtask
- logger.Info("executing subtask %s", subtaskMeta.Name)
subtaskNumber++
if progress != nil {
progress <- plugin.RunningProgress{
@@ -307,18 +310,32 @@ func RunPluginSubTasks(
SubTaskNumber: subtaskNumber,
}
}
- err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber,
subtaskMeta.EntryPoint)
- if err != nil {
- err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask
%s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
- logger.Error(err, "")
- where := dal.Where("task_id = ? and name = ?", task.ID,
subtaskCtx.GetName())
- if err := basicRes.GetDal().UpdateColumns(subtask,
[]dal.DalSet{
- {ColumnName: "is_failed", Value: 1},
- {ColumnName: "message", Value: err.Error()},
- }, where); err != nil {
- basicRes.GetLogger().Error(err, "error writing
subtask %v status to DB", subtaskCtx.GetName())
+ subtaskFinsied := false
+ if !subtaskMeta.ForceRunOnResume {
+ sfc := errors.Must1(
+ basicRes.GetDal().Count(
+ dal.From(&models.Subtask{}),
dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID,
subtaskMeta.Name),
+ ),
+ )
+ subtaskFinsied = sfc > 0
+ }
+ if subtaskFinsied {
+ logger.Info("subtask %s already finished previously",
subtaskMeta.Name)
+ } else {
+ logger.Info("executing subtask %s", subtaskMeta.Name)
+ err = runSubtask(basicRes, subtaskCtx, task.ID,
subtaskNumber, subtaskMeta.EntryPoint)
+ if err != nil {
+ err = errors.SubtaskErr.Wrap(err,
fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name),
errors.WithData(&subtaskMeta))
+ logger.Error(err, "")
+ where := dal.Where("task_id = ? and name = ?",
task.ID, subtaskCtx.GetName())
+ if err :=
basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
+ {ColumnName: "is_failed", Value: 1},
+ {ColumnName: "message", Value:
err.Error()},
+ }, where); err != nil {
+ basicRes.GetLogger().Error(err, "error
writing subtask %v status to DB", subtaskCtx.GetName())
+ }
+ return err
}
- return err
}
taskCtx.IncProgress(1)
}
diff --git a/backend/plugins/gitextractor/tasks/clone.go
b/backend/plugins/gitextractor/tasks/clone.go
index c27921a63..599d4adb6 100644
--- a/backend/plugins/gitextractor/tasks/clone.go
+++ b/backend/plugins/gitextractor/tasks/clone.go
@@ -38,6 +38,7 @@ var CloneGitRepoMeta = plugin.SubTaskMeta{
Required: true,
Description: "clone a git repo, make it available to later tasks",
DomainTypes: []string{plugin.DOMAIN_TYPE_CODE},
+ ForceRunOnResume: true,
}
func useGoGit(subTaskCtx plugin.SubTaskContext, taskData
*GitExtractorTaskData) bool {
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 2a1ac8ccb..80ce47e4e 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -79,34 +79,14 @@ func pipelineServiceInit() {
}
// standalone mode: reset pipeline status
- errMsg := "The process was terminated unexpectedly"
- err := db.UpdateColumns(
- &models.Pipeline{},
- []dal.DalSet{
- {ColumnName: "status", Value: models.TASK_FAILED},
- {ColumnName: "message", Value: errMsg},
- },
- dal.Where("status = ?", models.TASK_RUNNING),
- )
- if err != nil {
- panic(err)
- }
- err = db.UpdateColumns(
- &models.Task{},
- []dal.DalSet{
- {ColumnName: "status", Value: models.TASK_FAILED},
- {ColumnName: "message", Value: errMsg},
- },
- dal.Where("status = ?", models.TASK_RUNNING),
- )
- if err != nil {
- panic(err)
+ if cfg.GetBool("RESUME_PIPELINES") {
+ markInterruptedPipelineAs(models.TASK_RESUME)
+ } else {
+ markInterruptedPipelineAs(models.TASK_FAILED)
}
- err = ReloadBlueprints()
- if err != nil {
- panic(err)
- }
+ // load cronjobs for blueprints
+ errors.Must(ReloadBlueprints())
var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
if pipelineMaxParallel < 0 {
@@ -120,6 +100,23 @@ func pipelineServiceInit() {
go RunPipelineInQueue(pipelineMaxParallel)
}
+func markInterruptedPipelineAs(status string) {
+ errors.Must(db.UpdateColumns(
+ &models.Pipeline{},
+ []dal.DalSet{
+ {ColumnName: "status", Value: status},
+ },
+ dal.Where("status = ?", models.TASK_RUNNING),
+ ))
+ errors.Must(db.UpdateColumns(
+ &models.Task{},
+ []dal.DalSet{
+ {ColumnName: "status", Value: status},
+ },
+ dal.Where("status = ?", models.TASK_RUNNING),
+ ))
+}
+
// CreatePipeline and return the model
func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool)
(*models.Pipeline, errors.Error) {
pipeline, err := CreateDbPipeline(newPipeline)
@@ -238,7 +235,7 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
// prepare query to find an appropriate pipeline to execute
pipeline = &models.Pipeline{}
err = tx.First(pipeline,
- dal.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN}),
+ dal.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN, models.TASK_RESUME}),
dal.Join(
`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id =
_devlake_pipelines.id AND
@@ -254,11 +251,16 @@ func dequeuePipeline(runningParallelLabels []string)
(pipeline *models.Pipeline,
)
if err == nil {
// mark the pipeline running, now we want a write lock
+ if pipeline.BeganAt == nil {
+ now := time.Now()
+ pipeline.BeganAt = &now
+ globalPipelineLog.Info("resumed pipeline #%d",
pipeline.ID)
+ }
errors.Must(tx.LockTables(dal.LockTables{{Table:
"_devlake_pipelines", Exclusive: true}}))
err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
- {ColumnName: "began_at", Value: time.Now()},
+ {ColumnName: "began_at", Value: pipeline.BeganAt},
}, dal.Where("id = ?", pipeline.ID))
if err != nil {
panic(err)