This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 078af650 refactor: refactor migrationscripts (#3473)
078af650 is described below

commit 078af650db51b00a3aa7c9a04582eaed417ab6a0
Author: mappjzc <[email protected]>
AuthorDate: Wed Oct 26 14:21:50 2022 +0800

    refactor: refactor migrationscripts (#3473)
    
    * refactor: refactor migrationscripts
    
    refactor migrationscripts for gitlab
    Add AddTablerColumn and DropTablerColumn for dal
    
    Nddtfjiang <[email protected]>
    
    * fix: fix for review
    
    Add UpdateColumns
    
    Nddtfjiang <[email protected]>
    
    * refactor: add change columns type one by on
    
    Add ChangeColumnsTypeOneByOne.
    
    Nddtfjiang <[email protected]>
    
    * refactor: merge auto migrate table
    
    merged AutoMigrateTables and AutoMigrate
    
    Nddtfjiang <[email protected]>
    
    * refactor: for review
    
    for review to changed function name and note
    
    Nddtfjiang <[email protected]>
    
    * refactor: refactor dal
    
    move UpdateColumns to UpdateAllColumn
    add UpdateColumns for modify the specified group column
    remove RawCursor for Klesh mind.
    
    Nddtfjiang <[email protected]>
    
    * fix: fix unchanged file lint error
    
    fix a Unchanged files with check annotations.
    
    Nddtfjiang <[email protected]>
---
 helpers/migrationhelper/migrationhelper.go         | 132 +++++++++++++++++++++
 impl/dalgorm/dalgorm.go                            |  74 +++++++++---
 .../migrationscripts/20220903_encrypt_blueprint.go |   8 +-
 .../migrationscripts/20220904_encrypt_pipeline.go  |   8 +-
 .../20220905_modfiy_cicd_pipeline.go               |   6 +-
 .../migrationscripts/20220908_modfiy_cicd_tasks.go |   6 +-
 .../20220913_add_origin_value_for_pr.go            |   6 +-
 .../20220915_rename_pipeline_commits.go            |  17 +--
 .../20220929_change_leadtimeminutes_to_int64.go    |  52 +++-----
 plugins/azure/api/blueprint.go                     |   5 +-
 plugins/core/dal/dal.go                            |  38 ++++--
 plugins/gitlab/impl/impl.go                        |   6 +-
 .../migrationscripts/20220714_add_init_tables.go   |  41 ++++---
 ...odify_gilab_ci.go => 20220729_add_gitlab_ci.go} |  38 +++---
 .../migrationscripts/20220804_add_pipeline_id.go   |  17 +--
 .../20220906_fix_duration_to_float8.go             |  66 ++++-------
 .../20220907_add_pipeline_projects_tables.go       |  14 +--
 plugins/gitlab/models/migrationscripts/register.go |   8 +-
 plugins/jira/tasks/epic_collector.go               |  41 ++++---
 .../tasks/refs_pr_cherry_pick_calculator.go        |  43 +++----
 plugins/starrocks/tasks.go                         |  12 +-
 21 files changed, 406 insertions(+), 232 deletions(-)

diff --git a/helpers/migrationhelper/migrationhelper.go 
b/helpers/migrationhelper/migrationhelper.go
index f8f67acb..d02650b1 100644
--- a/helpers/migrationhelper/migrationhelper.go
+++ b/helpers/migrationhelper/migrationhelper.go
@@ -42,6 +42,138 @@ func AutoMigrateTables(basicRes core.BasicRes, dst 
...interface{}) errors.Error
        return nil
 }
 
+// ChangeColumnsType change the type of specified columns for the table
+func ChangeColumnsType[D any](
+       basicRes core.BasicRes,
+       script core.MigrationScript,
+       tableName string,
+       columns []string,
+       update func(tmpColumnParams []interface{}) errors.Error,
+) (err errors.Error) {
+       db := basicRes.GetDal()
+       tmpColumnsNames := make([]string, len(columns))
+       for i, v := range columns {
+               tmpColumnsNames[i] = fmt.Sprintf("%s_%s", v, hashScript(script))
+               err = db.RenameColumn(tableName, v, tmpColumnsNames[i])
+               if err != nil {
+                       return err
+               }
+
+               defer func(tmpColumnName string, ColumnsName string) {
+                       if err != nil {
+                               err1 := db.RenameColumn(tableName, 
tmpColumnName, ColumnsName)
+                               if err1 != nil {
+                                       err = errors.Default.Wrap(err, 
fmt.Sprintf("RollBack by RenameColum failed.Relevant data needs to be repaired 
manually.%s", err1.Error()))
+                               }
+                       }
+               }(tmpColumnsNames[i], v)
+       }
+
+       err = db.AutoMigrate(new(D), dal.From(tableName))
+       if err != nil {
+               return errors.Default.Wrap(err, "AutoMigrate for Add Colume 
Error")
+       }
+
+       defer func() {
+               if err != nil {
+                       err1 := db.DropColumns(tableName, columns...)
+                       if err1 != nil {
+                               err = errors.Default.Wrap(err, 
fmt.Sprintf("RollBack by DropColume failed.Relevant data needs to be repaired 
manually.%s", err1.Error()))
+                       }
+               }
+       }()
+
+       if update == nil {
+               dalSet := make([]dal.DalSet, 0, len(columns))
+               for i, v := range columns {
+                       dalSet = append(dalSet, dal.DalSet{
+                               ColumnName: v,
+                               Value:      dal.ClauseColumn{Name: 
tmpColumnsNames[i]},
+                       })
+               }
+               err = db.UpdateColumns(
+                       new(D),
+                       dalSet,
+               )
+       } else {
+               params := make([]interface{}, 0, len(tmpColumnsNames))
+               for _, v := range tmpColumnsNames {
+                       params = append(params, dal.ClauseColumn{Name: v})
+               }
+               err = update(params)
+       }
+       if err != nil {
+               return err
+       }
+
+       err = db.DropColumns(tableName, tmpColumnsNames...)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// TransformColumns change the type of specified columns for the table and 
transform data one by one
+func TransformColumns[S any, D any](
+       basicRes core.BasicRes,
+       script core.MigrationScript,
+       tableName string,
+       columns []string,
+       transform func(src *S) (*D, errors.Error),
+) (err errors.Error) {
+       db := basicRes.GetDal()
+       return ChangeColumnsType[D](
+               basicRes,
+               script,
+               tableName,
+               columns,
+               func(tmpColumnParams []interface{}) errors.Error {
+                       // create selectStr for transform tmpColumnsNames
+                       params := make([]interface{}, 0, len(columns)*2)
+                       selectStr := " * "
+                       for i, v := range columns {
+                               selectStr += ",? as ?"
+                               params = append(params, tmpColumnParams[i])
+                               params = append(params, dal.ClauseColumn{Name: 
v})
+                       }
+
+                       cursor, err := db.Cursor(
+                               dal.Select(selectStr, params...),
+                               dal.From(dal.ClauseTable{Name: tableName}),
+                       )
+                       if err != nil {
+                               return errors.Default.Wrap(err, 
fmt.Sprintf("failed to load data from src table [%s]", tableName))
+                       }
+
+                       defer cursor.Close()
+                       batch, err := helper.NewBatchSave(basicRes, 
reflect.TypeOf(new(D)), 200, tableName)
+                       if err != nil {
+                               return errors.Default.Wrap(err, 
fmt.Sprintf("failed to instantiate BatchSave for table [%s]", tableName))
+                       }
+                       defer batch.Close()
+                       src := new(S)
+                       for cursor.Next() {
+                               err = db.Fetch(cursor, src)
+                               if err != nil {
+                                       return errors.Default.Wrap(err, 
fmt.Sprintf("fail to load record from table [%s]", tableName))
+                               }
+
+                               dst, err := transform(src)
+
+                               if err != nil {
+                                       return errors.Default.Wrap(err, 
fmt.Sprintf("failed to update row %v", src))
+                               }
+                               err = batch.Add(dst)
+                               if err != nil {
+                                       return errors.Default.Wrap(err, 
fmt.Sprintf("push to BatchSave failed %v", dst))
+                               }
+                       }
+                       return nil
+               },
+       )
+}
+
 // TransformTable can be used when we need to change the table structure and 
reprocess all the data in the table.
 func TransformTable[S any, D any](
        basicRes core.BasicRes,
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 9801332e..9578afe1 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -35,15 +35,41 @@ type Dalgorm struct {
        db *gorm.DB
 }
 
+func transformParams(params []interface{}) []interface{} {
+       tp := make([]interface{}, 0, len(params))
+
+       for _, v := range params {
+               switch p := v.(type) {
+               case dal.ClauseColumn:
+                       tp = append(tp, clause.Column{
+                               Table: p.Table,
+                               Name:  p.Name,
+                               Alias: p.Alias,
+                               Raw:   p.Raw,
+                       })
+               case dal.ClauseTable:
+                       tp = append(tp, clause.Table{
+                               Name:  p.Name,
+                               Alias: p.Alias,
+                               Raw:   p.Raw,
+                       })
+               default:
+                       tp = append(tp, p)
+               }
+       }
+
+       return tp
+}
+
 func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
        for _, c := range clauses {
                t := c.Type
                d := c.Data
                switch t {
                case dal.JoinClause:
-                       tx = tx.Joins(d.(dal.DalClause).Expr, 
d.(dal.DalClause).Params...)
+                       tx = tx.Joins(d.(dal.DalClause).Expr, 
transformParams(d.(dal.DalClause).Params)...)
                case dal.WhereClause:
-                       tx = tx.Where(d.(dal.DalClause).Expr, 
d.(dal.DalClause).Params...)
+                       tx = tx.Where(d.(dal.DalClause).Expr, 
transformParams(d.(dal.DalClause).Params)...)
                case dal.OrderbyClause:
                        tx = tx.Order(d.(string))
                case dal.LimitClause:
@@ -51,17 +77,26 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
                case dal.OffsetClause:
                        tx = tx.Offset(d.(int))
                case dal.FromClause:
-                       if str, ok := d.(string); ok {
-                               tx = tx.Table(str)
-                       } else {
+                       switch dd := d.(type) {
+                       case string:
+                               tx = tx.Table(dd)
+                       case dal.DalClause:
+                               tx = tx.Table(dd.Expr, 
transformParams(dd.Params)...)
+                       case dal.ClauseTable:
+                               tx = tx.Table(" ? ", clause.Table{
+                                       Name:  dd.Name,
+                                       Alias: dd.Alias,
+                                       Raw:   dd.Raw,
+                               })
+                       default:
                                tx = tx.Model(d)
                        }
                case dal.SelectClause:
-                       tx = tx.Select(d.(string))
+                       tx = tx.Select(d.(dal.DalClause).Expr, 
transformParams(d.(dal.DalClause).Params)...)
                case dal.GroupbyClause:
                        tx = tx.Group(d.(string))
                case dal.HavingClause:
-                       tx = tx.Having(d.(dal.DalClause).Expr, 
d.(dal.DalClause).Params...)
+                       tx = tx.Having(d.(dal.DalClause).Expr, 
transformParams(d.(dal.DalClause).Params)...)
                }
        }
        return tx
@@ -69,14 +104,9 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
 
 var _ dal.Dal = (*Dalgorm)(nil)
 
-// RawCursor executes raw sql query and returns a database cursor
-func (d *Dalgorm) RawCursor(query string, params ...interface{}) (*sql.Rows, 
errors.Error) {
-       return errors.Convert01(d.db.Raw(query, params...).Rows())
-}
-
 // Exec executes raw sql query
 func (d *Dalgorm) Exec(query string, params ...interface{}) errors.Error {
-       return errors.Convert(d.db.Exec(query, params...).Error)
+       return errors.Convert(d.db.Exec(query, 
transformParams(params)...).Error)
 }
 
 // AutoMigrate runs auto migration for given models
@@ -154,13 +184,27 @@ func (d *Dalgorm) Delete(entity interface{}, clauses 
...dal.Clause) errors.Error
 // UpdateColumn allows you to update mulitple records
 func (d *Dalgorm) UpdateColumn(entity interface{}, columnName string, value 
interface{}, clauses ...dal.Clause) errors.Error {
        if expr, ok := value.(dal.DalClause); ok {
-               value = gorm.Expr(expr.Expr, expr.Params...)
+               value = gorm.Expr(expr.Expr, transformParams(expr.Params)...)
        }
        return errors.Convert(buildTx(d.db, 
clauses).Model(entity).Update(columnName, value).Error)
 }
 
 // UpdateColumns allows you to update multiple columns of mulitple records
-func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause) 
errors.Error {
+func (d *Dalgorm) UpdateColumns(entity interface{}, set []dal.DalSet, clauses 
...dal.Clause) errors.Error {
+       updatesSet := make(map[string]interface{})
+
+       for _, s := range set {
+               if expr, ok := s.Value.(dal.DalClause); ok {
+                       s.Value = gorm.Expr(expr.Expr, 
transformParams(expr.Params)...)
+               }
+               updatesSet[s.ColumnName] = s.Value
+       }
+
+       return errors.Convert(buildTx(d.db, 
clauses).Model(entity).Updates(updatesSet).Error)
+}
+
+// UpdateAllColumn updated all Columns of entity
+func (d *Dalgorm) UpdateAllColumn(entity interface{}, clauses ...dal.Clause) 
errors.Error {
        return errors.Convert(buildTx(d.db, 
clauses).UpdateColumns(entity).Error)
 }
 
diff --git a/models/migrationscripts/20220903_encrypt_blueprint.go 
b/models/migrationscripts/20220903_encrypt_blueprint.go
index c5171bc1..d1ca7af7 100644
--- a/models/migrationscripts/20220903_encrypt_blueprint.go
+++ b/models/migrationscripts/20220903_encrypt_blueprint.go
@@ -30,7 +30,7 @@ var _ core.MigrationScript = (*encryptBlueprint)(nil)
 
 type encryptBlueprint struct{}
 
-type Blueprint20220903Before struct {
+type blueprint20220903Before struct {
        Name           string          `json:"name" validate:"required"`
        Mode           string          `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
        Plan           json.RawMessage `json:"plan"`
@@ -41,7 +41,7 @@ type Blueprint20220903Before struct {
        archived.Model `swaggerignore:"true"`
 }
 
-type Blueprint20220903After struct {
+type blueprint20220903After struct {
        /* unchanged part */
        Name           string `json:"name" validate:"required"`
        Mode           string `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
@@ -64,7 +64,7 @@ func (script *encryptBlueprint) Up(basicRes core.BasicRes) 
errors.Error {
                basicRes,
                script,
                "_devlake_blueprints",
-               func(s *Blueprint20220903Before) (*Blueprint20220903After, 
errors.Error) {
+               func(s *blueprint20220903Before) (*blueprint20220903After, 
errors.Error) {
                        encryptedPlan, err := core.Encrypt(encKey, 
string(s.Plan))
                        if err != nil {
                                return nil, err
@@ -74,7 +74,7 @@ func (script *encryptBlueprint) Up(basicRes core.BasicRes) 
errors.Error {
                                return nil, err
                        }
 
-                       dst := &Blueprint20220903After{
+                       dst := &blueprint20220903After{
                                Name:       s.Name,
                                Mode:       s.Mode,
                                Enable:     s.Enable,
diff --git a/models/migrationscripts/20220904_encrypt_pipeline.go 
b/models/migrationscripts/20220904_encrypt_pipeline.go
index 9f64dd86..34391975 100644
--- a/models/migrationscripts/20220904_encrypt_pipeline.go
+++ b/models/migrationscripts/20220904_encrypt_pipeline.go
@@ -32,7 +32,7 @@ var _ core.MigrationScript = (*encryptPipeline)(nil)
 
 type encryptPipeline struct{}
 
-type Pipeline20220904Before struct {
+type pipeline20220904Before struct {
        archived.Model
        Name          string         `json:"name" gorm:"index"`
        BlueprintId   uint64         `json:"blueprintId"`
@@ -47,7 +47,7 @@ type Pipeline20220904Before struct {
        Stage         int            `json:"stage"`
 }
 
-type Pipeline0904After struct {
+type pipeline0904After struct {
        common.Model
        Name          string     `json:"name" gorm:"index"`
        BlueprintId   uint64     `json:"blueprintId"`
@@ -72,13 +72,13 @@ func (script *encryptPipeline) Up(basicRes core.BasicRes) 
errors.Error {
                basicRes,
                script,
                "_devlake_pipelines",
-               func(s *Pipeline20220904Before) (*Pipeline0904After, 
errors.Error) {
+               func(s *pipeline20220904Before) (*pipeline0904After, 
errors.Error) {
                        encryptedPlan, err := core.Encrypt(encKey, 
string(s.Plan))
                        if err != nil {
                                return nil, err
                        }
 
-                       dst := &Pipeline0904After{
+                       dst := &pipeline0904After{
                                Name:          s.Name,
                                BlueprintId:   s.BlueprintId,
                                FinishedTasks: s.FinishedTasks,
diff --git a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go 
b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
index 75ea909d..9b9dad98 100644
--- a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
+++ b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
@@ -28,13 +28,13 @@ var _ core.MigrationScript = (*modifyCicdPipeline)(nil)
 
 type modifyCicdPipeline struct{}
 
-type CICDPipelineRelationship20220905 struct {
+type cicdPipelineRelationship20220905 struct {
        ParentPipelineId string `gorm:"primaryKey;type:varchar(255)"`
        ChildPipelineId  string `gorm:"primaryKey;type:varchar(255)"`
        archived.NoPKModel
 }
 
-func (CICDPipelineRelationship20220905) TableName() string {
+func (cicdPipelineRelationship20220905) TableName() string {
        return "cicd_pipeline_relationships"
 }
 
@@ -48,7 +48,7 @@ func (*modifyCicdPipeline) Up(basicRes core.BasicRes) 
errors.Error {
        if err != nil {
                return err
        }
-       err = db.AutoMigrate(&CICDPipelineRelationship20220905{})
+       err = db.AutoMigrate(&cicdPipelineRelationship20220905{})
        if err != nil {
                return errors.Convert(err)
        }
diff --git a/models/migrationscripts/20220908_modfiy_cicd_tasks.go 
b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
index fa669536..2f0d3472 100644
--- a/models/migrationscripts/20220908_modfiy_cicd_tasks.go
+++ b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
@@ -26,16 +26,16 @@ var _ core.MigrationScript = (*modifyCICDTasks)(nil)
 
 type modifyCICDTasks struct{}
 
-type CICDTask0905 struct {
+type cicdTask0905 struct {
        Environment string `gorm:"type:varchar(255)"`
 }
 
-func (CICDTask0905) TableName() string {
+func (cicdTask0905) TableName() string {
        return "cicd_tasks"
 }
 
 func (*modifyCICDTasks) Up(basicRes core.BasicRes) errors.Error {
-       return basicRes.GetDal().AutoMigrate(&CICDTask0905{})
+       return basicRes.GetDal().AutoMigrate(&cicdTask0905{})
 }
 
 func (*modifyCICDTasks) Version() uint64 {
diff --git a/models/migrationscripts/20220913_add_origin_value_for_pr.go 
b/models/migrationscripts/20220913_add_origin_value_for_pr.go
index b26c61d8..3ce3d060 100644
--- a/models/migrationscripts/20220913_add_origin_value_for_pr.go
+++ b/models/migrationscripts/20220913_add_origin_value_for_pr.go
@@ -26,19 +26,19 @@ var _ core.MigrationScript = 
(*addOriginChangeValueForPr)(nil)
 
 type addOriginChangeValueForPr struct{}
 
-type PullRequest0913 struct {
+type pullRequest0913 struct {
        OrigCodingTimespan int64
        OrigReviewLag      int64
        OrigReviewTimespan int64
        OrigDeployTimespan int64
 }
 
-func (PullRequest0913) TableName() string {
+func (pullRequest0913) TableName() string {
        return "pull_requests"
 }
 
 func (*addOriginChangeValueForPr) Up(basicRes core.BasicRes) errors.Error {
-       return basicRes.GetDal().AutoMigrate(&PullRequest0913{})
+       return basicRes.GetDal().AutoMigrate(&pullRequest0913{})
 }
 
 func (*addOriginChangeValueForPr) Version() uint64 {
diff --git a/models/migrationscripts/20220915_rename_pipeline_commits.go 
b/models/migrationscripts/20220915_rename_pipeline_commits.go
index 69da85b7..6c46f670 100644
--- a/models/migrationscripts/20220915_rename_pipeline_commits.go
+++ b/models/migrationscripts/20220915_rename_pipeline_commits.go
@@ -27,18 +27,7 @@ var _ core.MigrationScript = (*renamePipelineCommits)(nil)
 
 type renamePipelineCommits struct{}
 
-type CiCDPipelineRepo20220915Before struct {
-       archived.DomainEntity
-       CommitSha string `gorm:"primaryKey;type:varchar(255)"`
-       Branch    string `gorm:"type:varchar(255)"`
-       Repo      string `gorm:"type:varchar(255)"`
-}
-
-func (CiCDPipelineRepo20220915Before) TableName() string {
-       return "cicd_pipeline_repos"
-}
-
-type CiCDPipelineRepo20220915After struct {
+type cicdPipelineRepo20220915After struct {
        archived.NoPKModel
        PipelineId string `gorm:"primaryKey;type:varchar(255)"`
        CommitSha  string `gorm:"primaryKey;type:varchar(255)"`
@@ -47,7 +36,7 @@ type CiCDPipelineRepo20220915After struct {
        RepoUrl    string
 }
 
-func (CiCDPipelineRepo20220915After) TableName() string {
+func (cicdPipelineRepo20220915After) TableName() string {
        return "cicd_pipeline_commits"
 }
 
@@ -65,7 +54,7 @@ func (*renamePipelineCommits) Up(basicRes core.BasicRes) 
errors.Error {
        if err != nil {
                return err
        }
-       err = db.AutoMigrate(CiCDPipelineRepo20220915After{})
+       err = db.AutoMigrate(cicdPipelineRepo20220915After{})
        if err != nil {
                return err
        }
diff --git 
a/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go 
b/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
index 553b4db7..bce665b2 100644
--- a/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
+++ b/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
@@ -19,6 +19,7 @@ package migrationscripts
 
 import (
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/core/dal"
 )
@@ -27,52 +28,33 @@ var _ core.MigrationScript = 
(*changeLeadTimeMinutesToInt64)(nil)
 
 type changeLeadTimeMinutesToInt64 struct{}
 
-type Issues20220929 struct {
+type issues20220929 struct {
        LeadTimeMinutes int64
 }
 
-func (Issues20220929) TableName() string {
+func (issues20220929) TableName() string {
        return "issues"
 }
 
-func (*changeLeadTimeMinutesToInt64) Up(basicRes core.BasicRes) errors.Error {
+func (script *changeLeadTimeMinutesToInt64) Up(basicRes core.BasicRes) 
errors.Error {
        // Yes, issues.lead_time_minutes might be negative, we ought to change 
the type
        // for the column from `uint` to `int64`
        // related issue: 
https://github.com/apache/incubator-devlake/issues/3224
        db := basicRes.GetDal()
-       bakColumnName := "lead_time_minutes_20220929"
-       err := db.RenameColumn("issues", "lead_time_minutes", bakColumnName)
-       defer func() {
-               if err != nil {
-                       _ = db.RenameColumn("issues", bakColumnName, 
"lead_time_minutes")
-               }
-       }()
-       if err != nil {
-               return err
-       }
-       err = db.AutoMigrate(&Issues20220929{})
-       if err != nil {
-               return err
-       }
-       defer func() {
-               if err != nil {
-                       _ = db.DropColumns("issues", "lead_time_minutes")
-               }
-       }()
-       err = db.UpdateColumn(
-               &Issues20220929{},
-               "lead_time_minutes",
-               dal.DalClause{Expr: bakColumnName},
-               dal.Where("lead_time_minutes != 0"),
+       return migrationhelper.ChangeColumnsType[issues20220929](
+               basicRes,
+               script,
+               issues20220929{}.TableName(),
+               []string{"lead_time_minutes"},
+               func(tmpColumnParams []interface{}) errors.Error {
+                       return db.UpdateColumn(
+                               &issues20220929{},
+                               "lead_time_minutes",
+                               dal.DalClause{Expr: " ? ", Params: 
tmpColumnParams},
+                               dal.Where("? != 0", tmpColumnParams...),
+                       )
+               },
        )
-       if err != nil {
-               return err
-       }
-       err = db.DropColumns("issues", bakColumnName)
-       if err != nil {
-               return err
-       }
-       return nil
 }
 
 func (*changeLeadTimeMinutesToInt64) Version() uint64 {
diff --git a/plugins/azure/api/blueprint.go b/plugins/azure/api/blueprint.go
index 6598aae8..29d6612b 100644
--- a/plugins/azure/api/blueprint.go
+++ b/plugins/azure/api/blueprint.go
@@ -19,6 +19,7 @@ package api
 
 import (
        "encoding/json"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
@@ -31,8 +32,8 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta, 
connectionId uint64, scop
        for i, scopeElem := range scope {
                // handle taskOptions and transformationRules, by dumping them 
to taskOptions
                taskOptions := make(map[string]interface{})
-               err = errors.Default.Wrap(json.Unmarshal(scopeElem.Options, 
&taskOptions), "unable to deserialize pipeline task options")
-               if err != nil {
+               err1 := json.Unmarshal(scopeElem.Options, &taskOptions)
+               if err1 != nil {
                        return nil, errors.Default.Wrap(err, "unable to 
deserialize pipeline task options")
                }
                taskOptions["connectionId"] = connectionId
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index a0121256..4d51a6b5 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -34,6 +34,21 @@ type Clause struct {
        Data interface{}
 }
 
+// ClauseColumn quote with name
+type ClauseColumn struct {
+       Table string
+       Name  string
+       Alias string
+       Raw   bool
+}
+
+// ClauseTable quote with name
+type ClauseTable struct {
+       Name  string
+       Alias string
+       Raw   bool
+}
+
 // ColumnMeta column type interface
 type ColumnMeta interface {
        Name() string
@@ -60,8 +75,6 @@ type Dal interface {
        DropColumns(table string, columnName ...string) errors.Error
        // Exec executes raw sql query
        Exec(query string, params ...interface{}) errors.Error
-       // RawCursor executes raw sql query and returns a database cursor
-       RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error)
        // Cursor returns a database cursor, cursor is especially useful when 
handling big amount of rows of data
        Cursor(clauses ...Clause) (*sql.Rows, errors.Error)
        // Fetch loads row data from `cursor` into `dst`
@@ -81,7 +94,9 @@ type Dal interface {
        // UpdateColumn allows you to update mulitple records
        UpdateColumn(entity interface{}, columnName string, value interface{}, 
clauses ...Clause) errors.Error
        // UpdateColumn allows you to update multiple columns of mulitple 
records
-       UpdateColumns(entity interface{}, clauses ...Clause) errors.Error
+       UpdateColumns(entity interface{}, set []DalSet, clauses ...Clause) 
errors.Error
+       // UpdateAllColumn updated all Columns of entity
+       UpdateAllColumn(entity interface{}, clauses ...Clause) errors.Error
        // CreateOrUpdate tries to create the record, or fallback to update all 
if failed
        CreateOrUpdate(entity interface{}, clauses ...Clause) errors.Error
        // CreateIfNotExist tries to create the record if not exist
@@ -143,6 +158,11 @@ type DalClause struct {
        Params []interface{}
 }
 
+type DalSet struct {
+       ColumnName string
+       Value      interface{}
+}
+
 const JoinClause string = "Join"
 
 // Join creates a new JoinClause
@@ -174,15 +194,19 @@ func Offset(offset int) Clause {
 const FromClause string = "From"
 
 // From creates a new TableClause
-func From(table interface{}) Clause {
-       return Clause{Type: FromClause, Data: table}
+func From(table interface{}, params ...interface{}) Clause {
+       if len(params) == 0 {
+               return Clause{Type: FromClause, Data: table}
+       } else {
+               return Clause{Type: FromClause, Data: DalClause{table.(string), 
params}}
+       }
 }
 
 const SelectClause string = "Select"
 
 // Select creates a new TableClause
-func Select(fields string) Clause {
-       return Clause{Type: SelectClause, Data: fields}
+func Select(clause string, params ...interface{}) Clause {
+       return Clause{Type: SelectClause, Data: DalClause{clause, params}}
 }
 
 const OrderbyClause string = "OrderBy"
diff --git a/plugins/gitlab/impl/impl.go b/plugins/gitlab/impl/impl.go
index 9127dffb..af16454b 100644
--- a/plugins/gitlab/impl/impl.go
+++ b/plugins/gitlab/impl/impl.go
@@ -19,9 +19,9 @@ package impl
 
 import (
        "fmt"
+
        "github.com/apache/incubator-devlake/errors"
 
-       "github.com/apache/incubator-devlake/migration"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/gitlab/api"
        "github.com/apache/incubator-devlake/plugins/gitlab/models"
@@ -37,7 +37,7 @@ var _ core.PluginInit = (*Gitlab)(nil)
 var _ core.PluginModel = (*Gitlab)(nil)
 var _ core.PluginTask = (*Gitlab)(nil)
 var _ core.PluginApi = (*Gitlab)(nil)
-var _ core.Migratable = (*Gitlab)(nil)
+var _ core.PluginMigration = (*Gitlab)(nil)
 var _ core.PluginBlueprintV100 = (*Gitlab)(nil)
 var _ core.CloseablePluginTask = (*Gitlab)(nil)
 
@@ -144,7 +144,7 @@ func (plugin Gitlab) RootPkgPath() string {
        return "github.com/apache/incubator-devlake/plugins/gitlab"
 }
 
-func (plugin Gitlab) MigrationScripts() []migration.Script {
+func (plugin Gitlab) MigrationScripts() []core.MigrationScript {
        return migrationscripts.All()
 }
 
diff --git a/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go 
b/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
index 7e5eec92..ad22091b 100644
--- a/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
+++ b/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
@@ -18,19 +18,19 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-       "github.com/apache/incubator-devlake/config"
+       "strconv"
+
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
        "github.com/apache/incubator-devlake/plugins/core"
        
"github.com/apache/incubator-devlake/plugins/gitlab/models/migrationscripts/archived"
-       "gorm.io/gorm"
-       "gorm.io/gorm/clause"
 )
 
 type addInitTables struct{}
 
-func (*addInitTables) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().DropTable(
+func (*addInitTables) Up(baseRes core.BasicRes) errors.Error {
+       db := baseRes.GetDal()
+       err := db.DropTables(
                &archived.GitlabProject{},
                &archived.GitlabMergeRequest{},
                &archived.GitlabCommit{},
@@ -61,10 +61,11 @@ func (*addInitTables) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
        )
 
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
-       err = db.Migrator().AutoMigrate(
+       err = migrationhelper.AutoMigrateTables(
+               baseRes,
                &archived.GitlabProject{},
                &archived.GitlabMergeRequest{},
                &archived.GitlabCommit{},
@@ -83,13 +84,12 @@ func (*addInitTables) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
        )
 
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
-       v := config.GetConfig()
-       encKey := v.GetString("ENCODE_KEY")
-       endPoint := v.GetString("GITLAB_ENDPOINT")
-       gitlabAuth := v.GetString("GITLAB_AUTH")
+       encKey := baseRes.GetConfig("ENCODE_KEY")
+       endPoint := baseRes.GetConfig("GITLAB_ENDPOINT")
+       gitlabAuth := baseRes.GetConfig("GITLAB_AUTH")
 
        if encKey == "" || endPoint == "" || gitlabAuth == "" {
                return nil
@@ -100,15 +100,18 @@ func (*addInitTables) Up(ctx context.Context, db 
*gorm.DB) errors.Error {
        conn.Endpoint = endPoint
        conn.Token, err = core.Encrypt(encKey, gitlabAuth)
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       conn.Proxy = v.GetString("GITLAB_PROXY")
-       conn.RateLimitPerHour = v.GetInt("GITLAB_API_REQUESTS_PER_HOUR")
-
-       err = db.Clauses(clause.OnConflict{DoNothing: true}).Create(conn).Error
+       conn.Proxy = baseRes.GetConfig("GITLAB_PROXY")
+       var err1 error
+       conn.RateLimitPerHour, err1 = 
strconv.Atoi(baseRes.GetConfig("GITLAB_API_REQUESTS_PER_HOUR"))
+       if err1 != nil {
+               conn.RateLimitPerHour = 1000
+       }
+       err = db.CreateIfNotExist(conn)
 
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
        return nil
diff --git a/plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go 
b/plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
similarity index 76%
rename from plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go
rename to plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
index 2d7ae899..4a178acd 100644
--- a/plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go
+++ b/plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
@@ -18,17 +18,20 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-       "github.com/apache/incubator-devlake/errors"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
+
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
-type modifyGitlabCI struct{}
+// Add gitlab job file for GitlabCI
+// Add gitlab_updated_at on gitlab pipeline for GitlabCI
+type addGitlabCI struct{}
 
-type GitlabPipeline20220729 struct {
+type gitlabPipeline20220729 struct {
        ConnectionId uint64 `gorm:"primaryKey"`
 
        GitlabId  int    `gorm:"primaryKey"`
@@ -48,11 +51,11 @@ type GitlabPipeline20220729 struct {
        archived.NoPKModel
 }
 
-func (GitlabPipeline20220729) TableName() string {
+func (gitlabPipeline20220729) TableName() string {
        return "_tool_gitlab_pipelines"
 }
 
-type GitlabJob20220729 struct {
+type gitlabJob20220729 struct {
        ConnectionId uint64 `gorm:"primaryKey"`
 
        GitlabId     int     `gorm:"primaryKey"`
@@ -73,28 +76,27 @@ type GitlabJob20220729 struct {
        archived.NoPKModel
 }
 
-func (GitlabJob20220729) TableName() string {
+func (gitlabJob20220729) TableName() string {
        return "_tool_gitlab_jobs"
 }
 
-func (*modifyGitlabCI) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AddColumn(&GitlabPipeline20220729{}, 
"gitlab_updated_at")
-       if err != nil {
-               return errors.Convert(err)
-       }
-
-       err = db.Migrator().AutoMigrate(&GitlabJob20220729{})
+func (*addGitlabCI) Up(baseRes core.BasicRes) errors.Error {
+       err := migrationhelper.AutoMigrateTables(
+               baseRes,
+               &gitlabJob20220729{},
+               &gitlabPipeline20220729{},
+       )
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
        return nil
 }
 
-func (*modifyGitlabCI) Version() uint64 {
+func (*addGitlabCI) Version() uint64 {
        return 20220729231236
 }
 
-func (*modifyGitlabCI) Name() string {
+func (*addGitlabCI) Name() string {
        return "pipeline and job"
 }
diff --git a/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go 
b/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
index 678543f8..17228d52 100644
--- a/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
+++ b/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
@@ -18,17 +18,17 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-       "github.com/apache/incubator-devlake/errors"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
+
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
 type addPipelineID struct{}
 
-type GitlabJob20220804 struct {
+type gitlabJob20220804 struct {
        ConnectionId uint64 `gorm:"primaryKey"`
 
        GitlabId     int     `gorm:"primaryKey"`
@@ -50,14 +50,15 @@ type GitlabJob20220804 struct {
        archived.NoPKModel
 }
 
-func (GitlabJob20220804) TableName() string {
+func (gitlabJob20220804) TableName() string {
        return "_tool_gitlab_jobs"
 }
 
-func (*addPipelineID) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AddColumn(&GitlabJob20220804{}, "pipeline_id")
+func (*addPipelineID) Up(baseRes core.BasicRes) errors.Error {
+       db := baseRes.GetDal()
+       err := db.AutoMigrate(&gitlabJob20220804{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        return nil
 }
diff --git 
a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go 
b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
index fb62cc82..19183bb7 100644
--- a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
+++ b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
@@ -18,63 +18,45 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/plugins/gitlab/api"
-       "github.com/apache/incubator-devlake/plugins/helper"
-       "gorm.io/gorm"
-       "reflect"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
 type fixDurationToFloat8 struct{}
 
-type GitlabJob20220906 struct {
+type gitlabJob20220906_old struct {
        ConnectionId uint64 `gorm:"primaryKey"`
        GitlabId     int    `gorm:"primaryKey"`
 
-       Duration  float64 `gorm:"type:text"`
-       Duration2 float64 `gorm:"type:float8"`
+       Duration float64 `gorm:"type:text"`
 }
+type gitlabJob20220906 struct {
+       ConnectionId uint64 `gorm:"primaryKey"`
+       GitlabId     int    `gorm:"primaryKey"`
 
-func (GitlabJob20220906) TableName() string {
-       return "_tool_gitlab_jobs"
+       Duration float64 `gorm:"type:float8"`
 }
 
-func (*fixDurationToFloat8) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AddColumn(&GitlabJob20220906{}, `duration2`)
-       if err != nil {
-               return errors.Convert(err)
-       }
-       cursor, err := 
db.Model(&GitlabJob20220906{}).Select([]string{"connection_id", "gitlab_id", 
"duration"}).Rows()
-       if err != nil {
-               return errors.Convert(err)
-       }
-       batch, err := helper.NewBatchSave(api.BasicRes, 
reflect.TypeOf(&GitlabJob20220906{}), 500)
-       if err != nil {
-               return errors.Default.Wrap(err, "error getting batch from 
table")
-       }
-       defer batch.Close()
-       for cursor.Next() {
-               job := GitlabJob20220906{}
-               err = db.ScanRows(cursor, &job)
-               if err != nil {
-                       return errors.Convert(err)
-               }
-               job.Duration2 = job.Duration
-               err = batch.Add(&job)
-               if err != nil {
-                       return errors.Convert(err)
-               }
-       }
+func (*fixDurationToFloat8) Up(baseRes core.BasicRes) errors.Error {
+       err := migrationhelper.TransformColumns(
+               baseRes,
+               &fixDurationToFloat8{},
+               "_tool_gitlab_jobs",
+               []string{"duration"},
+               func(src *gitlabJob20220906_old) (*gitlabJob20220906, 
errors.Error) {
+                       return &gitlabJob20220906{
+                               ConnectionId: src.ConnectionId,
+                               GitlabId:     src.GitlabId,
+                               Duration:     src.Duration,
+                       }, nil
+               },
+       )
 
-       err = db.Migrator().DropColumn(&GitlabJob20220906{}, `duration`)
        if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().RenameColumn(&GitlabJob20220906{}, `duration2`, 
`duration`)
-       if err != nil {
-               return errors.Convert(err)
+               return err
        }
+
        return nil
 }
 
diff --git 
a/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
 
b/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
index 456ea859..84bf5c44 100644
--- 
a/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
+++ 
b/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
@@ -18,16 +18,16 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
 
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
 type addPipelineProjects struct{}
 
-type GitlabPipelineProjects20220907 struct {
+type gitlabPipelineProjects20220907 struct {
        ConnectionId uint64 `gorm:"primaryKey"`
        PipelineId   int    `gorm:"primaryKey"`
        ProjectId    int    `gorm:"primaryKey"`
@@ -36,14 +36,14 @@ type GitlabPipelineProjects20220907 struct {
        archived.NoPKModel
 }
 
-func (GitlabPipelineProjects20220907) TableName() string {
+func (gitlabPipelineProjects20220907) TableName() string {
        return "_tool_gitlab_pipeline_projects"
 }
 
-func (*addPipelineProjects) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().CreateTable(&GitlabPipelineProjects20220907{})
+func (*addPipelineProjects) Up(baseRes core.BasicRes) errors.Error {
+       err := migrationhelper.AutoMigrateTables(baseRes, 
&gitlabPipelineProjects20220907{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        return nil
 }
diff --git a/plugins/gitlab/models/migrationscripts/register.go 
b/plugins/gitlab/models/migrationscripts/register.go
index aba5abaf..a91a1fcd 100644
--- a/plugins/gitlab/models/migrationscripts/register.go
+++ b/plugins/gitlab/models/migrationscripts/register.go
@@ -18,14 +18,14 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "github.com/apache/incubator-devlake/migration"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
 // All return all the migration scripts
-func All() []migration.Script {
-       return []migration.Script{
+func All() []core.MigrationScript {
+       return []core.MigrationScript{
                new(addInitTables),
-               new(modifyGitlabCI),
+               new(addGitlabCI),
                new(addPipelineID),
                new(addPipelineProjects),
                new(fixDurationToFloat8),
diff --git a/plugins/jira/tasks/epic_collector.go 
b/plugins/jira/tasks/epic_collector.go
index 3b09cee3..78e6c31d 100644
--- a/plugins/jira/tasks/epic_collector.go
+++ b/plugins/jira/tasks/epic_collector.go
@@ -19,17 +19,19 @@ package tasks
 
 import (
        "fmt"
+       "reflect"
+       "strings"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/core/dal"
-       "reflect"
-       "strings"
 
        "encoding/json"
-       "github.com/apache/incubator-devlake/plugins/helper"
        "io"
        "net/http"
        "net/url"
+
+       "github.com/apache/incubator-devlake/plugins/helper"
 )
 
 const RAW_EPIC_TABLE = "jira_api_epics"
@@ -108,23 +110,24 @@ func CollectEpics(taskCtx core.SubTaskContext) 
errors.Error {
 }
 
 func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int) 
(helper.Iterator, errors.Error) {
-       cursor, err := db.RawCursor(`
-                       SELECT
-                               DISTINCT epic_key
-                       FROM
-                               _tool_jira_issues i
+       cursor, err := db.Cursor(
+               dal.Select("DISTINCT epic_key"),
+               dal.From("_tool_jira_issues i"),
+               dal.Join(`
                        LEFT JOIN _tool_jira_board_issues bi ON (
-                               i.connection_id = bi.connection_id
-                               AND 
-                               i.issue_id = bi.issue_id
-                       )
-                       WHERE
-                               i.connection_id = ?
-                               AND 
-                               bi.board_id = ?
-                               AND
-                               i.epic_key != ''
-               `, data.Options.ConnectionId, data.Options.BoardId)
+                       i.connection_id = bi.connection_id
+                       AND 
+                       i.issue_id = bi.issue_id
+               )`),
+               dal.Where(`
+                       i.connection_id = ?
+                       AND 
+                       bi.board_id = ?
+                       AND
+                       i.epic_key != ''
+               `, data.Options.ConnectionId, data.Options.BoardId,
+               ),
+       )
        if err != nil {
                return nil, errors.Default.Wrap(err, "unable to query for 
external epics")
        }
diff --git a/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go 
b/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
index 3154798b..0daeb390 100644
--- a/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
+++ b/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
@@ -18,12 +18,13 @@ limitations under the License.
 package tasks
 
 import (
-       "github.com/apache/incubator-devlake/errors"
        "regexp"
        "strconv"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/models/domainlayer/code"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/core/dal"
@@ -114,25 +115,27 @@ func CalculatePrCherryPick(taskCtx core.SubTaskContext) 
errors.Error {
                taskCtx.IncProgress(1)
        }
 
-       cursor2, err := db.RawCursor(
-               `
-                       SELECT pr2.pull_request_key                 AS 
parent_pr_key,
-                              pr1.parent_pr_id                     AS 
parent_pr_id,
-                              pr1.base_ref                         AS 
cherrypick_base_branch,
-                              pr1.pull_request_key                 AS 
cherrypick_pr_key,
-                              repos.NAME                           AS 
repo_name,
-                              Concat(repos.url, '/pull/', 
pr2.pull_request_key) AS parent_pr_url,
-                                  pr2.created_date
-                       FROM   pull_requests pr1
-                              LEFT JOIN pull_requests pr2
-                                     ON pr1.parent_pr_id = pr2.id
-                              LEFT JOIN repos
-                                     ON pr2.base_repo_id = repos.id
-                       WHERE  pr1.parent_pr_id != ''
-                       ORDER  BY pr1.parent_pr_id,
-                                 pr2.created_date,
-                                         pr1.base_ref ASC
-                       `)
+       cursor2, err := db.Cursor(
+               dal.Select(`
+                       pr2.pull_request_key                 AS parent_pr_key,
+                       pr1.parent_pr_id                     AS parent_pr_id,
+                       pr1.base_ref                         AS 
cherrypick_base_branch,
+                       pr1.pull_request_key                 AS 
cherrypick_pr_key,
+                       repos.NAME                           AS repo_name,
+                       Concat(repos.url, '/pull/', pr2.pull_request_key) AS 
parent_pr_url,
+                       pr2.created_date
+               `),
+               dal.From(`pull_requests pr1`),
+               dal.Join(`LEFT JOIN pull_requests pr2 ON pr1.parent_pr_id = 
pr2.id`),
+               dal.Join(`LEFT JOIN repos ON pr2.base_repo_id = repos.id`),
+
+               dal.Where("pr1.parent_pr_id != ''"),
+               dal.Orderby(`
+                       pr1.parent_pr_id,
+                       pr2.created_date,
+                       pr1.base_ref ASC
+               `),
+       )
        if err != nil {
                return errors.Convert(err)
        }
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 19440210..f2ea396b 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -223,7 +223,12 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
                var rows *sql.Rows
                var data []map[string]interface{}
                // select data from db
-               rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by 
%s limit %d offset %d", table, orderBy, config.BatchSize, offset))
+               rows, err = db.Cursor(
+                       dal.From(table),
+                       dal.Orderby(orderBy),
+                       dal.Limit(config.BatchSize),
+                       dal.Offset(offset),
+               )
                if err != nil {
                        return err
                }
@@ -337,7 +342,10 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
                return err
        }
        // check data count
-       rows, err := db.RawCursor(fmt.Sprintf("select count(*) from %s", table))
+       rows, err := db.Cursor(
+               dal.Select("count(*)"),
+               dal.From(table),
+       )
        if err != nil {
                return err
        }


Reply via email to