This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new b7fa6648 feat(dora): connect issue to deploy
b7fa6648 is described below
commit b7fa6648d4cfe3a727704e2baa50c16576215758
Author: Yingchu Chen <[email protected]>
AuthorDate: Mon Sep 12 12:32:53 2022 +0800
feat(dora): connect issue to deploy
closes #2879
---
plugins/ae/tasks/commits_convertor.go | 2 +-
plugins/dora/impl/impl.go | 2 +-
plugins/dora/tasks/change_lead_time_convertor.go | 60 -----------
plugins/dora/tasks/issue_deploy_connector.go | 100 ++++++++++++++++++
plugins/dora/tasks/task_data.go | 5 +-
plugins/gitlab/tasks/commit_convertor.go | 2 +-
plugins/gitlab/tasks/mr_comment_convertor.go | 2 +-
plugins/gitlab/tasks/mr_commit_convertor.go | 2 +-
plugins/gitlab/tasks/mr_convertor.go | 2 +-
plugins/gitlab/tasks/project_convertor.go | 2 +-
plugins/helper/batch_save.go | 50 +++++----
plugins/helper/batch_save_divider.go | 78 ++++++++++++--
plugins/helper/data_enricher.go | 129 +++++++++++++++++++++++
13 files changed, 340 insertions(+), 96 deletions(-)
diff --git a/plugins/ae/tasks/commits_convertor.go
b/plugins/ae/tasks/commits_convertor.go
index 3938835c..136f537f 100644
--- a/plugins/ae/tasks/commits_convertor.go
+++ b/plugins/ae/tasks/commits_convertor.go
@@ -73,6 +73,6 @@ var ConvertCommitsMeta = core.SubTaskMeta{
Name: "convertCommits",
EntryPoint: ConvertCommits,
EnabledByDefault: true,
- Description: "Update domain layer commits dev_eq field according
to ae_commits",
+ Description: "Add domain layer commits dev_eq field according to
ae_commits",
DomainTypes: []string{core.DOMAIN_TYPE_CODE},
}
diff --git a/plugins/dora/impl/impl.go b/plugins/dora/impl/impl.go
index 222ec05f..bef74d6f 100644
--- a/plugins/dora/impl/impl.go
+++ b/plugins/dora/impl/impl.go
@@ -58,7 +58,7 @@ func (plugin Dora) Init(config *viper.Viper, logger
core.Logger, db *gorm.DB) er
func (plugin Dora) SubTaskMetas() []core.SubTaskMeta {
// TODO add your sub task here
return []core.SubTaskMeta{
- //tasks.ConvertChangeLeadTimeMeta,
+ tasks.ConnectIssueDeployMeta,
}
}
diff --git a/plugins/dora/tasks/change_lead_time_convertor.go
b/plugins/dora/tasks/change_lead_time_convertor.go
deleted file mode 100644
index 21b158c1..00000000
--- a/plugins/dora/tasks/change_lead_time_convertor.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package tasks
-
-import (
- "github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/helper"
-)
-
-var ConvertChangeLeadTimeMeta = core.SubTaskMeta{
- Name: "ConvertChangeLeadTime",
- EntryPoint: ConvertChangeLeadTime,
- EnabledByDefault: true,
- Description: "TODO",
- DomainTypes: []string{core.DOMAIN_TYPE_CICD},
-}
-
-const RAW_ISSUES_TABLE = `dora_issues`
-
-func ConvertChangeLeadTime(taskCtx core.SubTaskContext) error {
- //db := taskCtx.GetDal()
- //data := taskCtx.GetData().(*DoraTaskData)
-
- converter, err := helper.NewDataConverter(helper.DataConverterArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: DoraApiParams{
- // TODO
- },
- Table: RAW_ISSUES_TABLE,
- },
- //InputRowType: reflect.TypeOf(githubModels.GithubJob{}),
- //Input: cursor,
- Convert: func(inputRow interface{}) ([]interface{}, error) {
- // TODO
-
- return []interface{}{}, nil
- },
- })
- if err != nil {
- return err
- }
-
- return converter.Execute()
-}
diff --git a/plugins/dora/tasks/issue_deploy_connector.go
b/plugins/dora/tasks/issue_deploy_connector.go
new file mode 100644
index 00000000..2681df60
--- /dev/null
+++ b/plugins/dora/tasks/issue_deploy_connector.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/models/domainlayer/devops"
+ "github.com/apache/incubator-devlake/models/domainlayer/ticket"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "reflect"
+)
+
+var ConnectIssueDeployMeta = core.SubTaskMeta{
+ Name: "ConnectIssueDeploy",
+ EntryPoint: ConnectIssueDeploy,
+ EnabledByDefault: true,
+ Description: "TODO",
+ DomainTypes: []string{core.DOMAIN_TYPE_CICD},
+}
+
+const RAW_ISSUES_TABLE = `dora_issues`
+
+func ConnectIssueDeploy(taskCtx core.SubTaskContext) error {
+ db := taskCtx.GetDal()
+ data := taskCtx.GetData().(*DoraTaskData)
+
+ issue := &ticket.Issue{}
+ // select all issues belongs to the board
+ clauses := []dal.Clause{
+ dal.From(issue),
+ dal.Join(`left join board_issues
+ on issues.id = board_issues.issue_id`),
+ dal.Join("left join board_repos on board_repos.board_id =
board_issues.board_id"),
+ dal.Where(
+ "board_repos.repo_id = ? and issues.type = ?",
+ data.Options.RepoId, "Incident",
+ ),
+ }
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return err
+ }
+ defer cursor.Close()
+
+ enricher, err := helper.NewDataEnricher(helper.DataEnricherArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: DoraApiParams{
+ // TODO
+ },
+ Table: "issues",
+ },
+ InputRowType: reflect.TypeOf(ticket.Issue{}),
+ Input: cursor,
+ Enrich: func(inputRow interface{}) ([]interface{}, error) {
+ issueToBeUpdate := inputRow.(*ticket.Issue)
+ cicdTask := &devops.CICDTask{}
+ cicdTakClauses := []dal.Clause{
+ dal.From(cicdTask),
+ dal.Join(`left join cicd_pipelines
+ on cicd_pipelines.id = cicd_tasks.pipeline_id`),
+ dal.Join("left join cicd_pipeline_repos on
cicd_pipelines.id = cicd_pipeline_repos.id"),
+ dal.Where(
+ `cicd_pipeline_repos.repo = ? and
cicd_tasks.finished_date < ?
+ and
cicd_tasks.result = ? and cicd_tasks.type = ?`,
+ data.Options.RepoId,
issueToBeUpdate.CreatedDate, "SUCCESS", "DEPLOY",
+ ),
+ dal.Orderby("cicd_tasks.finished_date DESC"),
+ }
+ err = db.First(cicdTask, cicdTakClauses...)
+ if err != nil {
+ return nil, err
+ }
+ issueToBeUpdate.DeploymentId = cicdTask.Id
+
+ return []interface{}{issueToBeUpdate}, nil
+ },
+ })
+ if err != nil {
+ return err
+ }
+
+ return enricher.Execute()
+}
diff --git a/plugins/dora/tasks/task_data.go b/plugins/dora/tasks/task_data.go
index 6ef125a2..ebae0c34 100644
--- a/plugins/dora/tasks/task_data.go
+++ b/plugins/dora/tasks/task_data.go
@@ -25,8 +25,9 @@ type DoraApiParams struct {
}
type DoraOptions struct {
- Tasks []string `json:"tasks,omitempty"`
- Since string
+ Tasks []string `json:"tasks,omitempty"`
+ Since string
+ RepoId string
}
type DoraTaskData struct {
diff --git a/plugins/gitlab/tasks/commit_convertor.go
b/plugins/gitlab/tasks/commit_convertor.go
index 2dbe61df..0fd5d230 100644
--- a/plugins/gitlab/tasks/commit_convertor.go
+++ b/plugins/gitlab/tasks/commit_convertor.go
@@ -33,7 +33,7 @@ var ConvertCommitsMeta = core.SubTaskMeta{
Name: "convertApiCommits",
EntryPoint: ConvertApiCommits,
EnabledByDefault: false,
- Description: "Update domain layer commit according to
GitlabCommit",
+ Description: "Add domain layer commit according to GitlabCommit",
DomainTypes: []string{core.DOMAIN_TYPE_CODE},
}
diff --git a/plugins/gitlab/tasks/mr_comment_convertor.go
b/plugins/gitlab/tasks/mr_comment_convertor.go
index bb3206cb..66daf449 100644
--- a/plugins/gitlab/tasks/mr_comment_convertor.go
+++ b/plugins/gitlab/tasks/mr_comment_convertor.go
@@ -34,7 +34,7 @@ var ConvertMrCommentMeta = core.SubTaskMeta{
Name: "convertMergeRequestComment",
EntryPoint: ConvertMergeRequestComment,
EnabledByDefault: true,
- Description: "Update domain layer Comment according to
GitlabMrComment",
+ Description: "Add domain layer Comment according to
GitlabMrComment",
DomainTypes: []string{core.DOMAIN_TYPE_CODE_REVIEW},
}
diff --git a/plugins/gitlab/tasks/mr_commit_convertor.go
b/plugins/gitlab/tasks/mr_commit_convertor.go
index 68b5eb7a..13cc6c7d 100644
--- a/plugins/gitlab/tasks/mr_commit_convertor.go
+++ b/plugins/gitlab/tasks/mr_commit_convertor.go
@@ -32,7 +32,7 @@ var ConvertApiMrCommitsMeta = core.SubTaskMeta{
Name: "convertApiMergeRequestsCommits",
EntryPoint: ConvertApiMergeRequestsCommits,
EnabledByDefault: true,
- Description: "Update domain layer PullRequestCommit according to
GitlabMrCommit",
+ Description: "Add domain layer PullRequestCommit according to
GitlabMrCommit",
DomainTypes: []string{core.DOMAIN_TYPE_CODE_REVIEW},
}
diff --git a/plugins/gitlab/tasks/mr_convertor.go
b/plugins/gitlab/tasks/mr_convertor.go
index 4d0e7c11..53e0aac2 100644
--- a/plugins/gitlab/tasks/mr_convertor.go
+++ b/plugins/gitlab/tasks/mr_convertor.go
@@ -33,7 +33,7 @@ var ConvertApiMergeRequestsMeta = core.SubTaskMeta{
Name: "convertApiMergeRequests",
EntryPoint: ConvertApiMergeRequests,
EnabledByDefault: true,
- Description: "Update domain layer PullRequest according to
GitlabMergeRequest",
+ Description: "Add domain layer PullRequest according to
GitlabMergeRequest",
DomainTypes: []string{core.DOMAIN_TYPE_CODE_REVIEW},
}
diff --git a/plugins/gitlab/tasks/project_convertor.go
b/plugins/gitlab/tasks/project_convertor.go
index 59d271e3..62baaf07 100644
--- a/plugins/gitlab/tasks/project_convertor.go
+++ b/plugins/gitlab/tasks/project_convertor.go
@@ -36,7 +36,7 @@ var ConvertProjectMeta = core.SubTaskMeta{
Name: "convertApiProject",
EntryPoint: ConvertApiProjects,
EnabledByDefault: true,
- Description: "Update domain layer Repo according to GitlabProject",
+ Description: "Add domain layer Repo according to GitlabProject",
DomainTypes: []string{core.DOMAIN_TYPE_CODE,
core.DOMAIN_TYPE_TICKET},
}
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index d6990a3f..bb37e19c 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -27,8 +27,8 @@ import (
"github.com/apache/incubator-devlake/plugins/core/dal"
)
-// BatchSave performs mulitple records persistence of a specific type in one
sql query to improve the performance
-type BatchSave struct {
+// BatchShared is the base of BatchSave&BatchUpdate
+type BatchShared struct {
basicRes core.BasicRes
log core.Logger
db dal.Dal
@@ -42,13 +42,18 @@ type BatchSave struct {
primaryKey []reflect.StructField
}
+// BatchSave performs mulitple records persistence of a specific type in one
sql query to improve the performance
+type BatchSave struct {
+ *BatchShared
+}
+
// BatchUpdate will update records by batch
type BatchUpdate struct {
- *BatchSave
+ *BatchShared
}
-// NewBatchSave creates a new BatchSave instance
-func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int)
(*BatchSave, error) {
+// NewBatchShared creates a new NewBatchShared instance to used in
BatchSave&BatchUpdate
+func NewBatchShared(basicRes core.BasicRes, slotType reflect.Type, size int)
(*BatchShared, error) {
if slotType.Kind() != reflect.Ptr {
return nil, errors.Default.New("slotType must be a pointer")
}
@@ -60,7 +65,7 @@ func NewBatchSave(basicRes core.BasicRes, slotType
reflect.Type, size int) (*Bat
}
log := basicRes.GetLogger().Nested(slotType.String())
- return &BatchSave{
+ batchShared := &BatchShared{
basicRes: basicRes,
log: log,
db: db,
@@ -69,19 +74,26 @@ func NewBatchSave(basicRes core.BasicRes, slotType
reflect.Type, size int) (*Bat
size: size,
valueIndex: make(map[string]int),
primaryKey: primaryKey,
- }, nil
+ }
+ return batchShared, nil
+}
+
+// NewBatchSave creates a new BatchSave instance
+func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int)
(*BatchSave, error) {
+ batchShared, err := NewBatchShared(basicRes, slotType, size)
+ if err != nil {
+ return nil, err
+ }
+ return &BatchSave{batchShared}, nil
}
// NewBatchUpdate creates a new BatchUpdate instance
func NewBatchUpdate(basicRes core.BasicRes, slotType reflect.Type, size int)
(*BatchUpdate, error) {
- batchSave, err := NewBatchSave(basicRes, slotType, size)
+ batchShared, err := NewBatchShared(basicRes, slotType, size)
if err != nil {
return nil, err
}
- batchUpdate := BatchUpdate{
- BatchSave: batchSave,
- }
- return &batchUpdate, nil
+ return &BatchUpdate{batchShared}, nil
}
// Add record to cache. BatchSave would flush them into Database when cache is
max out
@@ -99,22 +111,22 @@ func (c *BatchSave) Add(slot interface{}) error {
return nil
}
-// Update record to cache. BatchSave would flush them into Database when cache
is max out
-func (c *BatchUpdate) Update(slot interface{}) error {
+// Add record to cache. BatchUpdate would flush them into Database when cache
is max out
+func (c *BatchUpdate) Add(slot interface{}) error {
err := c.prepareForFlush(slot)
if err != nil {
return err
}
// flush out into database if max outed
if c.current == c.size {
- return c.FlushUpdate()
+ return c.Flush()
} else if c.current%100 == 0 {
c.log.Debug("batch save current: %d", c.current)
}
return nil
}
-func (c *BatchSave) prepareForFlush(slot interface{}) error {
+func (c *BatchShared) prepareForFlush(slot interface{}) error {
// type checking
if reflect.TypeOf(slot) != c.slotType {
return errors.Default.New("sub cache type mismatched")
@@ -150,8 +162,8 @@ func (c *BatchSave) Flush() error {
return nil
}
-// FlushUpdate update cached records into database
-func (c *BatchUpdate) FlushUpdate() error {
+// Flush update cached records into database
+func (c *BatchUpdate) Flush() error {
err := c.db.UpdateColumns(c.slots.Slice(0, c.current).Interface())
if err != nil {
return err
@@ -173,7 +185,7 @@ func (c *BatchSave) Close() error {
// Close would flash the cache and release resources
func (c *BatchUpdate) Close() error {
if c.current > 0 {
- return c.FlushUpdate()
+ return c.Flush()
}
return nil
}
diff --git a/plugins/helper/batch_save_divider.go
b/plugins/helper/batch_save_divider.go
index 57549a10..0555c986 100644
--- a/plugins/helper/batch_save_divider.go
+++ b/plugins/helper/batch_save_divider.go
@@ -27,30 +27,65 @@ import (
"github.com/apache/incubator-devlake/plugins/core/dal"
)
-// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing
with massive amount of data records
-// with arbitrary types.
-type BatchSaveDivider struct {
+// BatchDivider is base struct of BatchSaveDivider&BatchUpdateDivider
+type BatchDivider struct {
basicRes core.BasicRes
- log core.Logger
db dal.Dal
- batches map[reflect.Type]*BatchSave
batchSize int
table string
params string
}
+// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing
with massive amount of data records
+// with arbitrary types.
+type BatchSaveDivider struct {
+ *BatchDivider
+ log core.Logger
+ batches map[reflect.Type]*BatchSave
+}
+
+// BatchUpdateDivider creates and caches BatchUpdate, this is helpful when
dealing with massive amount of data records
+// with arbitrary types.
+type BatchUpdateDivider struct {
+ *BatchDivider
+ log core.Logger
+ batches map[reflect.Type]*BatchUpdate
+}
+
// NewBatchSaveDivider create a new BatchInsertDivider instance
func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string,
params string) *BatchSaveDivider {
log := basicRes.GetLogger().Nested("batch divider")
- return &BatchSaveDivider{
+ batchDivider := &BatchDivider{
basicRes: basicRes,
- log: log,
db: basicRes.GetDal(),
- batches: make(map[reflect.Type]*BatchSave),
batchSize: batchSize,
table: table,
params: params,
}
+ batchSaveDivider := &BatchSaveDivider{
+ log: log,
+ batches: make(map[reflect.Type]*BatchSave),
+ }
+ batchSaveDivider.BatchDivider = batchDivider
+ return batchSaveDivider
+}
+
+// NewBatchUpdateDivider create a new BatchInsertDivider instance
+func NewBatchUpdateDivider(basicRes core.BasicRes, batchSize int, table
string, params string) *BatchUpdateDivider {
+ log := basicRes.GetLogger().Nested("batch update divider")
+ batchDivider := &BatchDivider{
+ basicRes: basicRes,
+ db: basicRes.GetDal(),
+ batchSize: batchSize,
+ table: table,
+ params: params,
+ }
+ batchUpdateDivider := &BatchUpdateDivider{
+ log: log,
+ batches: make(map[reflect.Type]*BatchUpdate),
+ }
+ batchUpdateDivider.BatchDivider = batchDivider
+ return batchUpdateDivider
}
// ForType returns a `BatchSave` instance for specific type
@@ -97,3 +132,30 @@ func (d *BatchSaveDivider) Close() error {
}
return nil
}
+
+// ForType returns a `BatchUpdate` instance for specific type
+func (d *BatchUpdateDivider) ForType(rowType reflect.Type) (*BatchUpdate,
error) {
+ // get the cache for the specific type
+ batch := d.batches[rowType]
+ var err error
+ // create one if not exists
+ if batch == nil {
+ batch, err = NewBatchUpdate(d.basicRes, rowType, d.batchSize)
+ if err != nil {
+ return nil, err
+ }
+ d.batches[rowType] = batch
+ }
+ return batch, nil
+}
+
+// Close all batches so the rest records get saved into db
+func (d *BatchUpdateDivider) Close() error {
+ for _, batch := range d.batches {
+ err := batch.Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/plugins/helper/data_enricher.go b/plugins/helper/data_enricher.go
new file mode 100644
index 00000000..e2f37d70
--- /dev/null
+++ b/plugins/helper/data_enricher.go
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+ "database/sql"
+ "github.com/apache/incubator-devlake/errors"
+ "reflect"
+
+ "github.com/apache/incubator-devlake/plugins/core"
+)
+
+// DataEnrichHandler Accept row from source cursor, return list of entities
that need to be stored
+type DataEnrichHandler func(row interface{}) ([]interface{}, error)
+
+// DataEnricherArgs includes the arguments about DataEnricher.
+// This will be used in Creating a DataEnricher.
+//
+// DataEnricherArgs {
+// InputRowType: type of inputRow ,
+// Input: dal cursor,
+// RawDataSubTaskArgs: args about raw data task
+// Enrich: main function
including conversion logic
+// BatchSize: batch size
+type DataEnricherArgs struct {
+ RawDataSubTaskArgs
+ // Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`,
`github:GithubIssue`
+ InputRowType reflect.Type
+ Input *sql.Rows
+ Enrich DataEnrichHandler
+ BatchSize int
+}
+
+// DataEnricher helps you convert Data from Tool Layer Tables to Domain Layer
Tables
+// It reads rows from specified Iterator, and feed it into `Enricher` handler
+// you can return arbitrary domain layer entities from this handler,
ApiEnricher would
+// first delete old data by their RawDataOrigin information, and then perform a
+// batch save operation for you.
+type DataEnricher struct {
+ *RawDataSubTask
+ args *DataEnricherArgs
+}
+
+// NewDataEnricher function helps you create a DataEnricher using
DataEnricherArgs.
+// You can see the usage in plugins/github/tasks/pr_issue_convertor.go or
other convertor file.
+func NewDataEnricher(args DataEnricherArgs) (*DataEnricher, error) {
+ rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+ if err != nil {
+ return nil, err
+ }
+ // process args
+ if args.BatchSize == 0 {
+ args.BatchSize = 500
+ }
+ return &DataEnricher{
+ RawDataSubTask: rawDataSubTask,
+ args: &args,
+ }, nil
+}
+
+// Execute function implements Subtask interface.
+// It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data
using `converter.args.Enrich` handler
+// Then save data to Domain Layer Tables using BatchSaveDivider
+func (enricher *DataEnricher) Execute() error {
+ // load data from database
+ db := enricher.args.Ctx.GetDal()
+
+ divider := NewBatchUpdateDivider(enricher.args.Ctx,
enricher.args.BatchSize, enricher.table, enricher.params)
+
+ // set progress
+ enricher.args.Ctx.SetProgress(0, -1)
+
+ cursor := enricher.args.Input
+ defer cursor.Close()
+ ctx := enricher.args.Ctx.GetContext()
+ // iterate all rows
+ for cursor.Next() {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ inputRow := reflect.New(enricher.args.InputRowType).Interface()
+ err := db.Fetch(cursor, inputRow)
+ if err != nil {
+ return errors.Default.Wrap(err, "error fetching rows",
errors.UserMessage("Internal Enricher execution error"))
+ }
+
+ results, err := enricher.args.Enrich(inputRow)
+ if err != nil {
+ return errors.Default.Wrap(err, "error calling Enricher
plugin implementation", errors.UserMessage("Internal Enricher execution error"))
+ }
+
+ for _, result := range results {
+ // get the batch operator for the specific type
+ batch, err := divider.ForType(reflect.TypeOf(result))
+ if err != nil {
+ return errors.Default.Wrap(err, "error getting
batch from result", errors.UserMessage("Internal Enricher execution error"))
+ }
+ // records get saved into db when slots were max outed
+ err = batch.Add(result)
+ if err != nil {
+ return errors.Default.Wrap(err, "error updating
result to batch", errors.UserMessage("Internal Enricher execution error"))
+ }
+ }
+ enricher.args.Ctx.IncProgress(1)
+ }
+
+ // save the last batches
+ return divider.Close()
+}
+
+// Check if DataEnricher implements SubTask interface
+var _ core.SubTask = (*DataEnricher)(nil)