This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new b7fa6648 feat(dora): connect issue to deploy
b7fa6648 is described below

commit b7fa6648d4cfe3a727704e2baa50c16576215758
Author: Yingchu Chen <[email protected]>
AuthorDate: Mon Sep 12 12:32:53 2022 +0800

    feat(dora): connect issue to deploy
    
    closes #2879
---
 plugins/ae/tasks/commits_convertor.go            |   2 +-
 plugins/dora/impl/impl.go                        |   2 +-
 plugins/dora/tasks/change_lead_time_convertor.go |  60 -----------
 plugins/dora/tasks/issue_deploy_connector.go     | 100 ++++++++++++++++++
 plugins/dora/tasks/task_data.go                  |   5 +-
 plugins/gitlab/tasks/commit_convertor.go         |   2 +-
 plugins/gitlab/tasks/mr_comment_convertor.go     |   2 +-
 plugins/gitlab/tasks/mr_commit_convertor.go      |   2 +-
 plugins/gitlab/tasks/mr_convertor.go             |   2 +-
 plugins/gitlab/tasks/project_convertor.go        |   2 +-
 plugins/helper/batch_save.go                     |  50 +++++----
 plugins/helper/batch_save_divider.go             |  78 ++++++++++++--
 plugins/helper/data_enricher.go                  | 129 +++++++++++++++++++++++
 13 files changed, 340 insertions(+), 96 deletions(-)

diff --git a/plugins/ae/tasks/commits_convertor.go 
b/plugins/ae/tasks/commits_convertor.go
index 3938835c..136f537f 100644
--- a/plugins/ae/tasks/commits_convertor.go
+++ b/plugins/ae/tasks/commits_convertor.go
@@ -73,6 +73,6 @@ var ConvertCommitsMeta = core.SubTaskMeta{
        Name:             "convertCommits",
        EntryPoint:       ConvertCommits,
        EnabledByDefault: true,
-       Description:      "Update domain layer commits dev_eq field according 
to ae_commits",
+       Description:      "Add domain layer commits dev_eq field according to 
ae_commits",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE},
 }
diff --git a/plugins/dora/impl/impl.go b/plugins/dora/impl/impl.go
index 222ec05f..bef74d6f 100644
--- a/plugins/dora/impl/impl.go
+++ b/plugins/dora/impl/impl.go
@@ -58,7 +58,7 @@ func (plugin Dora) Init(config *viper.Viper, logger 
core.Logger, db *gorm.DB) er
 func (plugin Dora) SubTaskMetas() []core.SubTaskMeta {
        // TODO add your sub task here
        return []core.SubTaskMeta{
-               //tasks.ConvertChangeLeadTimeMeta,
+               tasks.ConnectIssueDeployMeta,
        }
 }
 
diff --git a/plugins/dora/tasks/change_lead_time_convertor.go 
b/plugins/dora/tasks/change_lead_time_convertor.go
deleted file mode 100644
index 21b158c1..00000000
--- a/plugins/dora/tasks/change_lead_time_convertor.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package tasks
-
-import (
-       "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/helper"
-)
-
-var ConvertChangeLeadTimeMeta = core.SubTaskMeta{
-       Name:             "ConvertChangeLeadTime",
-       EntryPoint:       ConvertChangeLeadTime,
-       EnabledByDefault: true,
-       Description:      "TODO",
-       DomainTypes:      []string{core.DOMAIN_TYPE_CICD},
-}
-
-const RAW_ISSUES_TABLE = `dora_issues`
-
-func ConvertChangeLeadTime(taskCtx core.SubTaskContext) error {
-       //db := taskCtx.GetDal()
-       //data := taskCtx.GetData().(*DoraTaskData)
-
-       converter, err := helper.NewDataConverter(helper.DataConverterArgs{
-               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-                       Ctx:    taskCtx,
-                       Params: DoraApiParams{
-                               // TODO
-                       },
-                       Table: RAW_ISSUES_TABLE,
-               },
-               //InputRowType: reflect.TypeOf(githubModels.GithubJob{}),
-               //Input:        cursor,
-               Convert: func(inputRow interface{}) ([]interface{}, error) {
-                       // TODO
-
-                       return []interface{}{}, nil
-               },
-       })
-       if err != nil {
-               return err
-       }
-
-       return converter.Execute()
-}
diff --git a/plugins/dora/tasks/issue_deploy_connector.go 
b/plugins/dora/tasks/issue_deploy_connector.go
new file mode 100644
index 00000000..2681df60
--- /dev/null
+++ b/plugins/dora/tasks/issue_deploy_connector.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/models/domainlayer/devops"
+       "github.com/apache/incubator-devlake/models/domainlayer/ticket"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "reflect"
+)
+
+var ConnectIssueDeployMeta = core.SubTaskMeta{
+       Name:             "ConnectIssueDeploy",
+       EntryPoint:       ConnectIssueDeploy,
+       EnabledByDefault: true,
+       Description:      "TODO",
+       DomainTypes:      []string{core.DOMAIN_TYPE_CICD},
+}
+
+const RAW_ISSUES_TABLE = `dora_issues`
+
+func ConnectIssueDeploy(taskCtx core.SubTaskContext) error {
+       db := taskCtx.GetDal()
+       data := taskCtx.GetData().(*DoraTaskData)
+
+       issue := &ticket.Issue{}
+       // select all issues belongs to the board
+       clauses := []dal.Clause{
+               dal.From(issue),
+               dal.Join(`left join board_issues 
+                       on issues.id = board_issues.issue_id`),
+               dal.Join("left join board_repos on board_repos.board_id = 
board_issues.board_id"),
+               dal.Where(
+                       "board_repos.repo_id = ? and issues.type = ?",
+                       data.Options.RepoId, "Incident",
+               ),
+       }
+       cursor, err := db.Cursor(clauses...)
+       if err != nil {
+               return err
+       }
+       defer cursor.Close()
+
+       enricher, err := helper.NewDataEnricher(helper.DataEnricherArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx:    taskCtx,
+                       Params: DoraApiParams{
+                               // TODO
+                       },
+                       Table: "issues",
+               },
+               InputRowType: reflect.TypeOf(ticket.Issue{}),
+               Input:        cursor,
+               Enrich: func(inputRow interface{}) ([]interface{}, error) {
+                       issueToBeUpdate := inputRow.(*ticket.Issue)
+                       cicdTask := &devops.CICDTask{}
+                       cicdTakClauses := []dal.Clause{
+                               dal.From(cicdTask),
+                               dal.Join(`left join cicd_pipelines 
+                       on cicd_pipelines.id = cicd_tasks.pipeline_id`),
+                               dal.Join("left join cicd_pipeline_repos on 
cicd_pipelines.id = cicd_pipeline_repos.id"),
+                               dal.Where(
+                                       `cicd_pipeline_repos.repo = ? and 
cicd_tasks.finished_date < ? 
+                                                               and 
cicd_tasks.result = ? and cicd_tasks.type = ?`,
+                                       data.Options.RepoId, 
issueToBeUpdate.CreatedDate, "SUCCESS", "DEPLOY",
+                               ),
+                               dal.Orderby("cicd_tasks.finished_date DESC"),
+                       }
+                       err = db.First(cicdTask, cicdTakClauses...)
+                       if err != nil {
+                               return nil, err
+                       }
+                       issueToBeUpdate.DeploymentId = cicdTask.Id
+
+                       return []interface{}{issueToBeUpdate}, nil
+               },
+       })
+       if err != nil {
+               return err
+       }
+
+       return enricher.Execute()
+}
diff --git a/plugins/dora/tasks/task_data.go b/plugins/dora/tasks/task_data.go
index 6ef125a2..ebae0c34 100644
--- a/plugins/dora/tasks/task_data.go
+++ b/plugins/dora/tasks/task_data.go
@@ -25,8 +25,9 @@ type DoraApiParams struct {
 }
 
 type DoraOptions struct {
-       Tasks []string `json:"tasks,omitempty"`
-       Since string
+       Tasks  []string `json:"tasks,omitempty"`
+       Since  string
+       RepoId string
 }
 
 type DoraTaskData struct {
diff --git a/plugins/gitlab/tasks/commit_convertor.go 
b/plugins/gitlab/tasks/commit_convertor.go
index 2dbe61df..0fd5d230 100644
--- a/plugins/gitlab/tasks/commit_convertor.go
+++ b/plugins/gitlab/tasks/commit_convertor.go
@@ -33,7 +33,7 @@ var ConvertCommitsMeta = core.SubTaskMeta{
        Name:             "convertApiCommits",
        EntryPoint:       ConvertApiCommits,
        EnabledByDefault: false,
-       Description:      "Update domain layer commit according to 
GitlabCommit",
+       Description:      "Add domain layer commit according to GitlabCommit",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE},
 }
 
diff --git a/plugins/gitlab/tasks/mr_comment_convertor.go 
b/plugins/gitlab/tasks/mr_comment_convertor.go
index bb3206cb..66daf449 100644
--- a/plugins/gitlab/tasks/mr_comment_convertor.go
+++ b/plugins/gitlab/tasks/mr_comment_convertor.go
@@ -34,7 +34,7 @@ var ConvertMrCommentMeta = core.SubTaskMeta{
        Name:             "convertMergeRequestComment",
        EntryPoint:       ConvertMergeRequestComment,
        EnabledByDefault: true,
-       Description:      "Update domain layer Comment according to 
GitlabMrComment",
+       Description:      "Add domain layer Comment according to 
GitlabMrComment",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE_REVIEW},
 }
 
diff --git a/plugins/gitlab/tasks/mr_commit_convertor.go 
b/plugins/gitlab/tasks/mr_commit_convertor.go
index 68b5eb7a..13cc6c7d 100644
--- a/plugins/gitlab/tasks/mr_commit_convertor.go
+++ b/plugins/gitlab/tasks/mr_commit_convertor.go
@@ -32,7 +32,7 @@ var ConvertApiMrCommitsMeta = core.SubTaskMeta{
        Name:             "convertApiMergeRequestsCommits",
        EntryPoint:       ConvertApiMergeRequestsCommits,
        EnabledByDefault: true,
-       Description:      "Update domain layer PullRequestCommit according to 
GitlabMrCommit",
+       Description:      "Add domain layer PullRequestCommit according to 
GitlabMrCommit",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE_REVIEW},
 }
 
diff --git a/plugins/gitlab/tasks/mr_convertor.go 
b/plugins/gitlab/tasks/mr_convertor.go
index 4d0e7c11..53e0aac2 100644
--- a/plugins/gitlab/tasks/mr_convertor.go
+++ b/plugins/gitlab/tasks/mr_convertor.go
@@ -33,7 +33,7 @@ var ConvertApiMergeRequestsMeta = core.SubTaskMeta{
        Name:             "convertApiMergeRequests",
        EntryPoint:       ConvertApiMergeRequests,
        EnabledByDefault: true,
-       Description:      "Update domain layer PullRequest according to 
GitlabMergeRequest",
+       Description:      "Add domain layer PullRequest according to 
GitlabMergeRequest",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE_REVIEW},
 }
 
diff --git a/plugins/gitlab/tasks/project_convertor.go 
b/plugins/gitlab/tasks/project_convertor.go
index 59d271e3..62baaf07 100644
--- a/plugins/gitlab/tasks/project_convertor.go
+++ b/plugins/gitlab/tasks/project_convertor.go
@@ -36,7 +36,7 @@ var ConvertProjectMeta = core.SubTaskMeta{
        Name:             "convertApiProject",
        EntryPoint:       ConvertApiProjects,
        EnabledByDefault: true,
-       Description:      "Update domain layer Repo according to GitlabProject",
+       Description:      "Add domain layer Repo according to GitlabProject",
        DomainTypes:      []string{core.DOMAIN_TYPE_CODE, 
core.DOMAIN_TYPE_TICKET},
 }
 
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index d6990a3f..bb37e19c 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -27,8 +27,8 @@ import (
        "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
-// BatchSave performs mulitple records persistence of a specific type in one 
sql query to improve the performance
-type BatchSave struct {
+// BatchShared is the base of BatchSave&BatchUpdate
+type BatchShared struct {
        basicRes core.BasicRes
        log      core.Logger
        db       dal.Dal
@@ -42,13 +42,18 @@ type BatchSave struct {
        primaryKey []reflect.StructField
 }
 
+// BatchSave performs mulitple records persistence of a specific type in one 
sql query to improve the performance
+type BatchSave struct {
+       *BatchShared
+}
+
 // BatchUpdate will update records by batch
 type BatchUpdate struct {
-       *BatchSave
+       *BatchShared
 }
 
-// NewBatchSave creates a new BatchSave instance
-func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchSave, error) {
+// NewBatchShared creates a new NewBatchShared instance to used in 
BatchSave&BatchUpdate
+func NewBatchShared(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchShared, error) {
        if slotType.Kind() != reflect.Ptr {
                return nil, errors.Default.New("slotType must be a pointer")
        }
@@ -60,7 +65,7 @@ func NewBatchSave(basicRes core.BasicRes, slotType 
reflect.Type, size int) (*Bat
        }
 
        log := basicRes.GetLogger().Nested(slotType.String())
-       return &BatchSave{
+       batchShared := &BatchShared{
                basicRes:   basicRes,
                log:        log,
                db:         db,
@@ -69,19 +74,26 @@ func NewBatchSave(basicRes core.BasicRes, slotType 
reflect.Type, size int) (*Bat
                size:       size,
                valueIndex: make(map[string]int),
                primaryKey: primaryKey,
-       }, nil
+       }
+       return batchShared, nil
+}
+
+// NewBatchSave creates a new BatchSave instance
+func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchSave, error) {
+       batchShared, err := NewBatchShared(basicRes, slotType, size)
+       if err != nil {
+               return nil, err
+       }
+       return &BatchSave{batchShared}, nil
 }
 
 // NewBatchUpdate creates a new BatchUpdate instance
 func NewBatchUpdate(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchUpdate, error) {
-       batchSave, err := NewBatchSave(basicRes, slotType, size)
+       batchShared, err := NewBatchShared(basicRes, slotType, size)
        if err != nil {
                return nil, err
        }
-       batchUpdate := BatchUpdate{
-               BatchSave: batchSave,
-       }
-       return &batchUpdate, nil
+       return &BatchUpdate{batchShared}, nil
 }
 
 // Add record to cache. BatchSave would flush them into Database when cache is 
max out
@@ -99,22 +111,22 @@ func (c *BatchSave) Add(slot interface{}) error {
        return nil
 }
 
-// Update record to cache. BatchSave would flush them into Database when cache 
is max out
-func (c *BatchUpdate) Update(slot interface{}) error {
+// Add record to cache. BatchUpdate would flush them into Database when cache 
is max out
+func (c *BatchUpdate) Add(slot interface{}) error {
        err := c.prepareForFlush(slot)
        if err != nil {
                return err
        }
        // flush out into database if max outed
        if c.current == c.size {
-               return c.FlushUpdate()
+               return c.Flush()
        } else if c.current%100 == 0 {
                c.log.Debug("batch save current: %d", c.current)
        }
        return nil
 }
 
-func (c *BatchSave) prepareForFlush(slot interface{}) error {
+func (c *BatchShared) prepareForFlush(slot interface{}) error {
        // type checking
        if reflect.TypeOf(slot) != c.slotType {
                return errors.Default.New("sub cache type mismatched")
@@ -150,8 +162,8 @@ func (c *BatchSave) Flush() error {
        return nil
 }
 
-// FlushUpdate update cached records into database
-func (c *BatchUpdate) FlushUpdate() error {
+// Flush update cached records into database
+func (c *BatchUpdate) Flush() error {
        err := c.db.UpdateColumns(c.slots.Slice(0, c.current).Interface())
        if err != nil {
                return err
@@ -173,7 +185,7 @@ func (c *BatchSave) Close() error {
 // Close would flash the cache and release resources
 func (c *BatchUpdate) Close() error {
        if c.current > 0 {
-               return c.FlushUpdate()
+               return c.Flush()
        }
        return nil
 }
diff --git a/plugins/helper/batch_save_divider.go 
b/plugins/helper/batch_save_divider.go
index 57549a10..0555c986 100644
--- a/plugins/helper/batch_save_divider.go
+++ b/plugins/helper/batch_save_divider.go
@@ -27,30 +27,65 @@ import (
        "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
-// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing 
with massive amount of data records
-// with arbitrary types.
-type BatchSaveDivider struct {
+// BatchDivider is base struct of BatchSaveDivider&BatchUpdateDivider
+type BatchDivider struct {
        basicRes  core.BasicRes
-       log       core.Logger
        db        dal.Dal
-       batches   map[reflect.Type]*BatchSave
        batchSize int
        table     string
        params    string
 }
 
+// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing 
with massive amount of data records
+// with arbitrary types.
+type BatchSaveDivider struct {
+       *BatchDivider
+       log     core.Logger
+       batches map[reflect.Type]*BatchSave
+}
+
+// BatchUpdateDivider creates and caches BatchUpdate, this is helpful when 
dealing with massive amount of data records
+// with arbitrary types.
+type BatchUpdateDivider struct {
+       *BatchDivider
+       log     core.Logger
+       batches map[reflect.Type]*BatchUpdate
+}
+
 // NewBatchSaveDivider create a new BatchInsertDivider instance
 func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, 
params string) *BatchSaveDivider {
        log := basicRes.GetLogger().Nested("batch divider")
-       return &BatchSaveDivider{
+       batchDivider := &BatchDivider{
                basicRes:  basicRes,
-               log:       log,
                db:        basicRes.GetDal(),
-               batches:   make(map[reflect.Type]*BatchSave),
                batchSize: batchSize,
                table:     table,
                params:    params,
        }
+       batchSaveDivider := &BatchSaveDivider{
+               log:     log,
+               batches: make(map[reflect.Type]*BatchSave),
+       }
+       batchSaveDivider.BatchDivider = batchDivider
+       return batchSaveDivider
+}
+
+// NewBatchUpdateDivider create a new BatchInsertDivider instance
+func NewBatchUpdateDivider(basicRes core.BasicRes, batchSize int, table 
string, params string) *BatchUpdateDivider {
+       log := basicRes.GetLogger().Nested("batch update divider")
+       batchDivider := &BatchDivider{
+               basicRes:  basicRes,
+               db:        basicRes.GetDal(),
+               batchSize: batchSize,
+               table:     table,
+               params:    params,
+       }
+       batchUpdateDivider := &BatchUpdateDivider{
+               log:     log,
+               batches: make(map[reflect.Type]*BatchUpdate),
+       }
+       batchUpdateDivider.BatchDivider = batchDivider
+       return batchUpdateDivider
 }
 
 // ForType returns a `BatchSave` instance for specific type
@@ -97,3 +132,30 @@ func (d *BatchSaveDivider) Close() error {
        }
        return nil
 }
+
+// ForType returns a `BatchUpdate` instance for specific type
+func (d *BatchUpdateDivider) ForType(rowType reflect.Type) (*BatchUpdate, 
error) {
+       // get the cache for the specific type
+       batch := d.batches[rowType]
+       var err error
+       // create one if not exists
+       if batch == nil {
+               batch, err = NewBatchUpdate(d.basicRes, rowType, d.batchSize)
+               if err != nil {
+                       return nil, err
+               }
+               d.batches[rowType] = batch
+       }
+       return batch, nil
+}
+
+// Close all batches so the rest records get saved into db
+func (d *BatchUpdateDivider) Close() error {
+       for _, batch := range d.batches {
+               err := batch.Close()
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/plugins/helper/data_enricher.go b/plugins/helper/data_enricher.go
new file mode 100644
index 00000000..e2f37d70
--- /dev/null
+++ b/plugins/helper/data_enricher.go
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+       "database/sql"
+       "github.com/apache/incubator-devlake/errors"
+       "reflect"
+
+       "github.com/apache/incubator-devlake/plugins/core"
+)
+
+// DataEnrichHandler Accept row from source cursor, return list of entities 
that need to be stored
+type DataEnrichHandler func(row interface{}) ([]interface{}, error)
+
+// DataEnricherArgs includes the arguments about DataEnricher.
+// This will be used in Creating a DataEnricher.
+//
+//     DataEnricherArgs {
+//                             InputRowType:           type of inputRow ,
+//                             Input:                  dal cursor,
+//                             RawDataSubTaskArgs: args about raw data task
+//                             Enrich:                         main function 
including conversion logic
+//                             BatchSize:                      batch size
+type DataEnricherArgs struct {
+       RawDataSubTaskArgs
+       // Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, 
`github:GithubIssue`
+       InputRowType reflect.Type
+       Input        *sql.Rows
+       Enrich       DataEnrichHandler
+       BatchSize    int
+}
+
+// DataEnricher helps you convert Data from Tool Layer Tables to Domain Layer 
Tables
+// It reads rows from specified Iterator, and feed it into `Enricher` handler
+// you can return arbitrary domain layer entities from this handler, 
ApiEnricher would
+// first delete old data by their RawDataOrigin information, and then perform a
+// batch save operation for you.
+type DataEnricher struct {
+       *RawDataSubTask
+       args *DataEnricherArgs
+}
+
+// NewDataEnricher function helps you create a DataEnricher using 
DataEnricherArgs.
+// You can see the usage in plugins/github/tasks/pr_issue_convertor.go or 
other convertor file.
+func NewDataEnricher(args DataEnricherArgs) (*DataEnricher, error) {
+       rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+       if err != nil {
+               return nil, err
+       }
+       // process args
+       if args.BatchSize == 0 {
+               args.BatchSize = 500
+       }
+       return &DataEnricher{
+               RawDataSubTask: rawDataSubTask,
+               args:           &args,
+       }, nil
+}
+
+// Execute function implements Subtask interface.
+// It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data 
using `converter.args.Enrich` handler
+// Then save data to Domain Layer Tables using BatchSaveDivider
+func (enricher *DataEnricher) Execute() error {
+       // load data from database
+       db := enricher.args.Ctx.GetDal()
+
+       divider := NewBatchUpdateDivider(enricher.args.Ctx, 
enricher.args.BatchSize, enricher.table, enricher.params)
+
+       // set progress
+       enricher.args.Ctx.SetProgress(0, -1)
+
+       cursor := enricher.args.Input
+       defer cursor.Close()
+       ctx := enricher.args.Ctx.GetContext()
+       // iterate all rows
+       for cursor.Next() {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
+               inputRow := reflect.New(enricher.args.InputRowType).Interface()
+               err := db.Fetch(cursor, inputRow)
+               if err != nil {
+                       return errors.Default.Wrap(err, "error fetching rows", 
errors.UserMessage("Internal Enricher execution error"))
+               }
+
+               results, err := enricher.args.Enrich(inputRow)
+               if err != nil {
+                       return errors.Default.Wrap(err, "error calling Enricher 
plugin implementation", errors.UserMessage("Internal Enricher execution error"))
+               }
+
+               for _, result := range results {
+                       // get the batch operator for the specific type
+                       batch, err := divider.ForType(reflect.TypeOf(result))
+                       if err != nil {
+                               return errors.Default.Wrap(err, "error getting 
batch from result", errors.UserMessage("Internal Enricher execution error"))
+                       }
+                       // records get saved into db when slots were max outed
+                       err = batch.Add(result)
+                       if err != nil {
+                               return errors.Default.Wrap(err, "error updating 
result to batch", errors.UserMessage("Internal Enricher execution error"))
+                       }
+               }
+               enricher.args.Ctx.IncProgress(1)
+       }
+
+       // save the last batches
+       return divider.Close()
+}
+
+// Check if DataEnricher implements SubTask interface
+var _ core.SubTask = (*DataEnricher)(nil)

Reply via email to