This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new f8f3cdeb [feat-2433]: record execution time for plugin subtasks (#2455)
f8f3cdeb is described below

commit f8f3cdeb72c12ce2d6a6de9bc9041125a2d8214f
Author: Keon Amini <[email protected]>
AuthorDate: Tue Aug 2 20:32:54 2022 -0500

    [feat-2433]: record execution time for plugin subtasks (#2455)
    
    * feat(framework): add execution time for plugin subtasks (#2433)
    
    * feat: add subtask number to subtask model
    
    * feat: Remove batched saving in favor of sequential
    
    closes #2433
---
 .../20220711_add_subtasks_table.go                 | 39 +++++++++++++++++
 models/migrationscripts/register.go                |  1 +
 models/task.go                                     | 14 ++++++
 plugins/core/plugin_task.go                        |  1 -
 runner/directrun.go                                |  2 +
 runner/run_task.go                                 | 50 ++++++++++++++++++----
 6 files changed, 98 insertions(+), 9 deletions(-)

diff --git a/models/migrationscripts/20220711_add_subtasks_table.go 
b/models/migrationscripts/20220711_add_subtasks_table.go
new file mode 100644
index 00000000..3798289f
--- /dev/null
+++ b/models/migrationscripts/20220711_add_subtasks_table.go
@@ -0,0 +1,39 @@
+package migrationscripts
+
+import (
+       "context"
+       commonArchived 
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
+       "gorm.io/gorm"
+       "time"
+)
+
+type addSubtasksTable struct {
+}
+
+// Subtask20220711 DB snapshot model of models.Subtask
+type Subtask20220711 struct {
+       commonArchived.Model
+       TaskID       uint64     `json:"task_id" gorm:"index"`
+       SubtaskName  string     `json:"name" gorm:"column:name;index"`
+       Number       int        `json:"number"`
+       BeganAt      *time.Time `json:"beganAt"`
+       FinishedAt   *time.Time `json:"finishedAt" gorm:"index"`
+       SpentSeconds int64      `json:"spentSeconds"`
+}
+
+func (s Subtask20220711) TableName() string {
+       return "_devlake_subtasks"
+}
+
+func (u addSubtasksTable) Up(_ context.Context, db *gorm.DB) error {
+       err := db.Migrator().AutoMigrate(&Subtask20220711{})
+       return err
+}
+
+func (u addSubtasksTable) Version() uint64 {
+       return 20220711000001
+}
+
+func (u addSubtasksTable) Name() string {
+       return "create subtask schema"
+}
diff --git a/models/migrationscripts/register.go 
b/models/migrationscripts/register.go
index c49fab8e..47591a18 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -33,5 +33,6 @@ func All() []migration.Script {
                new(removeNotes),
                new(addProjectMapping),
                new(addNoPKModelToCommitParent),
+               new(addSubtasksTable),
        }
 }
diff --git a/models/task.go b/models/task.go
index 9f78d6cf..12005763 100644
--- a/models/task.go
+++ b/models/task.go
@@ -68,6 +68,20 @@ type NewTask struct {
        PipelineCol int    `json:"-"`
 }
 
+type Subtask struct {
+       common.Model
+       TaskID       uint64     `json:"task_id" gorm:"index"`
+       Name         string     `json:"name" gorm:"index"`
+       Number       int        `json:"number"`
+       BeganAt      *time.Time `json:"beganAt"`
+       FinishedAt   *time.Time `json:"finishedAt" gorm:"index"`
+       SpentSeconds int64      `json:"spentSeconds"`
+}
+
 func (Task) TableName() string {
        return "_devlake_tasks"
 }
+
+func (Subtask) TableName() string {
+       return "_devlake_subtasks"
+}
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index b2a66be6..ee66becd 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -19,7 +19,6 @@ package core
 
 import (
        "context"
-
        "github.com/apache/incubator-devlake/plugins/core/dal"
        "gorm.io/gorm"
 )
diff --git a/runner/directrun.go b/runner/directrun.go
index 748f5be2..b93dd643 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -79,11 +79,13 @@ func DirectRun(cmd *cobra.Command, args []string, 
pluginTask core.PluginTask, op
                panic(err)
        }
        ctx := createContext()
+       var parentTaskID uint64 = 0
        err = RunPluginSubTasks(
                ctx,
                cfg,
                log,
                db,
+               parentTaskID,
                cmd.Use,
                tasks,
                options,
diff --git a/runner/run_task.go b/runner/run_task.go
index b1f783ee..55ec737c 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -38,7 +38,7 @@ import (
 // RunTask FIXME ...
 func RunTask(
        ctx context.Context,
-       cfg *viper.Viper,
+       _ *viper.Viper,
        logger core.Logger,
        db *gorm.DB,
        progress chan core.RunningProgress,
@@ -112,6 +112,7 @@ func RunTask(
                config.GetConfig(),
                logger.Nested(task.Plugin),
                db,
+               task.ID,
                task.Plugin,
                subtasks,
                options,
@@ -126,6 +127,7 @@ func RunPluginTask(
        cfg *viper.Viper,
        logger core.Logger,
        db *gorm.DB,
+       taskID uint64,
        name string,
        subtasks []string,
        options map[string]interface{},
@@ -145,6 +147,7 @@ func RunPluginTask(
                cfg,
                logger,
                db,
+               taskID,
                name,
                subtasks,
                options,
@@ -159,8 +162,9 @@ func RunPluginSubTasks(
        cfg *viper.Viper,
        logger core.Logger,
        db *gorm.DB,
+       taskID uint64,
        name string,
-       subtasks []string,
+       subtaskNames []string,
        options map[string]interface{},
        pluginTask core.PluginTask,
        progress chan core.RunningProgress,
@@ -181,10 +185,10 @@ func RunPluginSubTasks(
        */
 
        // user specifies what subtasks to run
-       if len(subtasks) != 0 {
+       if len(subtaskNames) != 0 {
                // decode user specified subtasks
                var specifiedTasks []string
-               err := mapstructure.Decode(subtasks, &specifiedTasks)
+               err := mapstructure.Decode(subtaskNames, &specifiedTasks)
                if err != nil {
                        return err
                }
@@ -231,7 +235,7 @@ func RunPluginSubTasks(
 
        // execute subtasks in order
        taskCtx.SetProgress(0, steps)
-       i := 0
+       subtaskNumber := 0
        for _, subtaskMeta := range subtaskMetas {
                subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
                if err != nil {
@@ -245,15 +249,15 @@ func RunPluginSubTasks(
 
                // run subtask
                logger.Info("executing subtask %s", subtaskMeta.Name)
-               i++
+               subtaskNumber++
                if progress != nil {
                        progress <- core.RunningProgress{
                                Type:          core.SetCurrentSubTask,
                                SubTaskName:   subtaskMeta.Name,
-                               SubTaskNumber: i,
+                               SubTaskNumber: subtaskNumber,
                        }
                }
-               err = subtaskMeta.EntryPoint(subtaskCtx)
+               err = runSubtask(logger, db, taskID, subtaskNumber, subtaskCtx, 
subtaskMeta.EntryPoint)
                if err != nil {
                        return &errors.SubTaskError{
                                SubTaskName: subtaskMeta.Name,
@@ -292,3 +296,33 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, 
taskId uint64, progre
                progressDetail.SubTaskNumber = p.SubTaskNumber
        }
 }
+
+func runSubtask(
+       logger core.Logger,
+       db *gorm.DB,
+       parentID uint64,
+       subtaskNumber int,
+       ctx core.SubTaskContext,
+       entryPoint core.SubTaskEntryPoint,
+) error {
+       beginAt := time.Now()
+       subtask := &models.Subtask{
+               Name:    ctx.GetName(),
+               TaskID:  parentID,
+               Number:  subtaskNumber,
+               BeganAt: &beginAt,
+       }
+       defer func() {
+               finishedAt := time.Now()
+               subtask.FinishedAt = &finishedAt
+               subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
+               recordSubtask(logger, db, subtask)
+       }()
+       return entryPoint(ctx)
+}
+
+func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) {
+       if err := db.Create(&subtask).Error; err != nil {
+               logger.Error("error writing subtask %d status to DB: %v", 
subtask.ID, err)
+       }
+}

Reply via email to