This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 04dba47b6 fix: update `pipeline.finished_tasks` in time (#4098)
04dba47b6 is described below
commit 04dba47b6348c78ed9f25a135f7cb4a500bf774a
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Jan 4 16:14:46 2023 +0800
fix: update `pipeline.finished_tasks` in time (#4098)
---
runner/run_pipeline.go | 7 -------
runner/run_task.go | 13 +++++++++++--
2 files changed, 11 insertions(+), 9 deletions(-)
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 04c964480..1d7ced8a4 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -90,13 +90,6 @@ func runPipelineTasks(
return err
}
}
-
- // update finishedTasks
- err = db.UpdateColumn(dbPipeline, "finished_tasks",
dal.Expr("finished_tasks + ?", len(row)))
- if err != nil {
- log.Error(err, "update pipeline state failed")
- return err
- }
}
log.Info("pipeline finished in %d ms: %v",
time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
return err
diff --git a/runner/run_task.go b/runner/run_task.go
index ef9ccce36..72a575ffd 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -92,7 +92,7 @@ func RunTask(
{ColumnName: "failed_sub_task", Value:
subTaskName},
})
if dbe != nil {
- log.Error(err, "failed to finalize task status
into db (task failed)")
+ log.Error(dbe, "failed to finalize task status
into db (task failed)")
}
} else {
dbe := db.UpdateColumns(task, []dal.DalSet{
@@ -102,9 +102,18 @@ func RunTask(
{ColumnName: "spent_seconds", Value:
spentSeconds},
})
if dbe != nil {
- log.Error(err, "failed to finalize task status
into db (task succeeded)")
+ log.Error(dbe, "failed to finalize task status
into db (task succeeded)")
}
}
+ // update finishedTasks
+ dbe := db.UpdateColumn(
+ &models.DbPipeline{},
+ "finished_tasks", dal.Expr("finished_tasks + 1"),
+ dal.Where("id=?", task.PipelineId),
+ )
+ if dbe != nil {
+ log.Error(dbe, "update pipeline state failed")
+ }
}()
// start execution