This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 04dba47b6 fix: update `pipeline.finished_tasks`  in time (#4098)
04dba47b6 is described below

commit 04dba47b6348c78ed9f25a135f7cb4a500bf774a
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Jan 4 16:14:46 2023 +0800

    fix: update `pipeline.finished_tasks`  in time (#4098)
---
 runner/run_pipeline.go |  7 -------
 runner/run_task.go     | 13 +++++++++++--
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 04c964480..1d7ced8a4 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -90,13 +90,6 @@ func runPipelineTasks(
                                return err
                        }
                }
-
-               // update finishedTasks
-               err = db.UpdateColumn(dbPipeline, "finished_tasks", 
dal.Expr("finished_tasks + ?", len(row)))
-               if err != nil {
-                       log.Error(err, "update pipeline state failed")
-                       return err
-               }
        }
        log.Info("pipeline finished in %d ms: %v", 
time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
        return err
diff --git a/runner/run_task.go b/runner/run_task.go
index ef9ccce36..72a575ffd 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -92,7 +92,7 @@ func RunTask(
                                {ColumnName: "failed_sub_task", Value: 
subTaskName},
                        })
                        if dbe != nil {
-                               log.Error(err, "failed to finalize task status 
into db (task failed)")
+                               log.Error(dbe, "failed to finalize task status 
into db (task failed)")
                        }
                } else {
                        dbe := db.UpdateColumns(task, []dal.DalSet{
@@ -102,9 +102,18 @@ func RunTask(
                                {ColumnName: "spent_seconds", Value: 
spentSeconds},
                        })
                        if dbe != nil {
-                               log.Error(err, "failed to finalize task status 
into db (task succeeded)")
+                               log.Error(dbe, "failed to finalize task status 
into db (task succeeded)")
                        }
                }
+               // update finishedTasks
+               dbe := db.UpdateColumn(
+                       &models.DbPipeline{},
+                       "finished_tasks", dal.Expr("finished_tasks + 1"),
+                       dal.Where("id=?", task.PipelineId),
+               )
+               if dbe != nil {
+                       log.Error(dbe, "update pipeline state failed")
+               }
        }()
 
        // start execution

Reply via email to