This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new f8f3cdeb [feat-2433]: record execution time for plugin subtasks (#2455)
f8f3cdeb is described below
commit f8f3cdeb72c12ce2d6a6de9bc9041125a2d8214f
Author: Keon Amini <[email protected]>
AuthorDate: Tue Aug 2 20:32:54 2022 -0500
[feat-2433]: record execution time for plugin subtasks (#2455)
* feat(framework): add execution time for plugin subtasks (#2433)
* feat: add subtask number to subtask model
* feat: Remove batched saving in favor of sequential
closes #2433
---
.../20220711_add_subtasks_table.go | 39 +++++++++++++++++
models/migrationscripts/register.go | 1 +
models/task.go | 14 ++++++
plugins/core/plugin_task.go | 1 -
runner/directrun.go | 2 +
runner/run_task.go | 50 ++++++++++++++++++----
6 files changed, 98 insertions(+), 9 deletions(-)
diff --git a/models/migrationscripts/20220711_add_subtasks_table.go
b/models/migrationscripts/20220711_add_subtasks_table.go
new file mode 100644
index 00000000..3798289f
--- /dev/null
+++ b/models/migrationscripts/20220711_add_subtasks_table.go
@@ -0,0 +1,39 @@
+package migrationscripts
+
+import (
+ "context"
+ commonArchived
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
+ "gorm.io/gorm"
+ "time"
+)
+
+type addSubtasksTable struct {
+}
+
+// Subtask20220711 DB snapshot model of models.Subtask
+type Subtask20220711 struct {
+ commonArchived.Model
+ TaskID uint64 `json:"task_id" gorm:"index"`
+ SubtaskName string `json:"name" gorm:"column:name;index"`
+ Number int `json:"number"`
+ BeganAt *time.Time `json:"beganAt"`
+ FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
+ SpentSeconds int64 `json:"spentSeconds"`
+}
+
+func (s Subtask20220711) TableName() string {
+ return "_devlake_subtasks"
+}
+
+func (u addSubtasksTable) Up(_ context.Context, db *gorm.DB) error {
+ err := db.Migrator().AutoMigrate(&Subtask20220711{})
+ return err
+}
+
+func (u addSubtasksTable) Version() uint64 {
+ return 20220711000001
+}
+
+func (u addSubtasksTable) Name() string {
+ return "create subtask schema"
+}
diff --git a/models/migrationscripts/register.go
b/models/migrationscripts/register.go
index c49fab8e..47591a18 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -33,5 +33,6 @@ func All() []migration.Script {
new(removeNotes),
new(addProjectMapping),
new(addNoPKModelToCommitParent),
+ new(addSubtasksTable),
}
}
diff --git a/models/task.go b/models/task.go
index 9f78d6cf..12005763 100644
--- a/models/task.go
+++ b/models/task.go
@@ -68,6 +68,20 @@ type NewTask struct {
PipelineCol int `json:"-"`
}
+type Subtask struct {
+ common.Model
+ TaskID uint64 `json:"task_id" gorm:"index"`
+ Name string `json:"name" gorm:"index"`
+ Number int `json:"number"`
+ BeganAt *time.Time `json:"beganAt"`
+ FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
+ SpentSeconds int64 `json:"spentSeconds"`
+}
+
func (Task) TableName() string {
return "_devlake_tasks"
}
+
+func (Subtask) TableName() string {
+ return "_devlake_subtasks"
+}
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index b2a66be6..ee66becd 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -19,7 +19,6 @@ package core
import (
"context"
-
"github.com/apache/incubator-devlake/plugins/core/dal"
"gorm.io/gorm"
)
diff --git a/runner/directrun.go b/runner/directrun.go
index 748f5be2..b93dd643 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -79,11 +79,13 @@ func DirectRun(cmd *cobra.Command, args []string,
pluginTask core.PluginTask, op
panic(err)
}
ctx := createContext()
+ var parentTaskID uint64 = 0
err = RunPluginSubTasks(
ctx,
cfg,
log,
db,
+ parentTaskID,
cmd.Use,
tasks,
options,
diff --git a/runner/run_task.go b/runner/run_task.go
index b1f783ee..55ec737c 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -38,7 +38,7 @@ import (
// RunTask FIXME ...
func RunTask(
ctx context.Context,
- cfg *viper.Viper,
+ _ *viper.Viper,
logger core.Logger,
db *gorm.DB,
progress chan core.RunningProgress,
@@ -112,6 +112,7 @@ func RunTask(
config.GetConfig(),
logger.Nested(task.Plugin),
db,
+ task.ID,
task.Plugin,
subtasks,
options,
@@ -126,6 +127,7 @@ func RunPluginTask(
cfg *viper.Viper,
logger core.Logger,
db *gorm.DB,
+ taskID uint64,
name string,
subtasks []string,
options map[string]interface{},
@@ -145,6 +147,7 @@ func RunPluginTask(
cfg,
logger,
db,
+ taskID,
name,
subtasks,
options,
@@ -159,8 +162,9 @@ func RunPluginSubTasks(
cfg *viper.Viper,
logger core.Logger,
db *gorm.DB,
+ taskID uint64,
name string,
- subtasks []string,
+ subtaskNames []string,
options map[string]interface{},
pluginTask core.PluginTask,
progress chan core.RunningProgress,
@@ -181,10 +185,10 @@ func RunPluginSubTasks(
*/
// user specifies what subtasks to run
- if len(subtasks) != 0 {
+ if len(subtaskNames) != 0 {
// decode user specified subtasks
var specifiedTasks []string
- err := mapstructure.Decode(subtasks, &specifiedTasks)
+ err := mapstructure.Decode(subtaskNames, &specifiedTasks)
if err != nil {
return err
}
@@ -231,7 +235,7 @@ func RunPluginSubTasks(
// execute subtasks in order
taskCtx.SetProgress(0, steps)
- i := 0
+ subtaskNumber := 0
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
@@ -245,15 +249,15 @@ func RunPluginSubTasks(
// run subtask
logger.Info("executing subtask %s", subtaskMeta.Name)
- i++
+ subtaskNumber++
if progress != nil {
progress <- core.RunningProgress{
Type: core.SetCurrentSubTask,
SubTaskName: subtaskMeta.Name,
- SubTaskNumber: i,
+ SubTaskNumber: subtaskNumber,
}
}
- err = subtaskMeta.EntryPoint(subtaskCtx)
+ err = runSubtask(logger, db, taskID, subtaskNumber, subtaskCtx,
subtaskMeta.EntryPoint)
if err != nil {
return &errors.SubTaskError{
SubTaskName: subtaskMeta.Name,
@@ -292,3 +296,33 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger,
taskId uint64, progre
progressDetail.SubTaskNumber = p.SubTaskNumber
}
}
+
+func runSubtask(
+ logger core.Logger,
+ db *gorm.DB,
+ parentID uint64,
+ subtaskNumber int,
+ ctx core.SubTaskContext,
+ entryPoint core.SubTaskEntryPoint,
+) error {
+ beginAt := time.Now()
+ subtask := &models.Subtask{
+ Name: ctx.GetName(),
+ TaskID: parentID,
+ Number: subtaskNumber,
+ BeganAt: &beginAt,
+ }
+ defer func() {
+ finishedAt := time.Now()
+ subtask.FinishedAt = &finishedAt
+ subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
+ recordSubtask(logger, db, subtask)
+ }()
+ return entryPoint(ctx)
+}
+
+func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) {
+ if err := db.Create(&subtask).Error; err != nil {
+ logger.Error("error writing subtask %d status to DB: %v",
subtask.ID, err)
+ }
+}