This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 447920a2 Migration adopts `BasicRes` (#3176)
447920a2 is described below

commit 447920a22540eca255bd413cb63d5eeda41f6150
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Oct 18 11:05:04 2022 +0800

    Migration adopts `BasicRes` (#3176)
    
    * refactor: migration adopts Dal
    
    * refactor: updating framework script to adopt new migratable interface
    
    * refactor: updating framework migration scripts 20220830
    
    * refactor: unify migrationhelper
    
    * refactor: updated encrypt blueprint migration script
    
    * refactor: updated encrypt_pipline migration script
    
    * refactor: updated 20220905_modfiy_cicd_pipeline
    
    * refactor: updated 20220908_modfiy_cicd_tasks
    
    * refactor: updated add_origin_value_for_pr
    
    * refactor: updated fix_commitfile_id_toolong script
    
    * refactor: updated modfiy_board_repos script
    
    * refactor: updated rename_pipeline_commits
    
    * refactor: upgrade dal.DropColumn to dal.DropColumns
    
    * refactor: updated all framework level scripts
    
    * style: linting
    
    * fix: all scripts executed successfully
    
    * fix: pg cached plan error
    
    * refactor: rename modify_board_repos to add_raw_data_origin_to_board_repos
    
    * fix: remove unaffected fields for script
    
    * fix: migration interfaces should coexists at this point
    
    * fix: linting
    
    * fix: e2e-test
---
 .env.example                                       |   3 +-
 helpers/migrationhelper/helper.go                  | 187 ----------------
 helpers/migrationhelper/migrationhelper.go         | 138 ++++++++++++
 impl/dalgorm/dalgorm.go                            |  81 +++++--
 impl/default_basic_res.go                          |  59 +++++
 impl/migration/migrator.go                         | 125 +++++++++++
 impl/migration/migrator_test.go                    |  86 ++++++++
 .../migration/models.go                            |  15 +-
 .../migrationscripts/20220406_add_frame_tables.go  |  21 +-
 ...o => 20220505_rename_pipeline_step_to_stage.go} |  16 +-
 .../20220601_add_subtasks_field.go                 |  63 ------
 ...s.go => 20220601_add_subtasks_to_task_table.go} |  30 ++-
 ...rint_mode.go => 20220616_add_blueprint_mode.go} |  34 +--
 .../20220622_rename_tasks_to_plan.go               |  36 +--
 ...n_tables.go => 20220707_reset_domain_tables.go} |  26 ++-
 .../20220711_add_subtasks_table.go                 |  21 +-
 .../20220722_commitfile_component.go               |  44 ++--
 .../20220725_add_project_mapping_table.go          |   9 +-
 models/migrationscripts/20220727_remove_notes.go   |   9 +-
 ...220729_rename_columns_of_pull_request_issues.go |  18 +-
 ... => 20220801_add_nopkmodel_to_commit_parent.go} |  14 +-
 ...818_add_cicd.go => 20220818_add_cicd_tables.go} |  57 ++---
 ...6_rename_columns_of_pr_comment_issue_comment.go |  19 +-
 .../20220829_modify_tables_for_dora.go             |  54 ++---
 .../20220830_add_type_field_in_board.go            |  29 +--
 .../migrationscripts/20220903_encrypt_blueprint.go | 138 +++++-------
 .../migrationscripts/20220904_encrypt_pipeline.go  | 128 +++++------
 .../20220905_modfiy_cicd_pipeline.go               |  70 ++----
 .../migrationscripts/20220908_modfiy_cicd_tasks.go |  30 ++-
 .../20220913_add_origin_value_for_pr.go            |  50 ++---
 ...20220913_add_raw_data_origin_to_board_repos.go} |  33 ++-
 .../20220913_commitfile_add_length.go              | 244 ---------------------
 .../20220913_fix_commitfile_id_toolong.go          | 114 ++++++++++
 .../20220915_rename_pipeline_commits.go            |  37 ++--
 .../20220918_commit_line_change.go                 |  21 +-
 models/migrationscripts/20220927_add_snapshot.go   |  17 +-
 .../20220929_modify_lead_time_minutes.go           |  47 ++--
 models/migrationscripts/register.go                |  32 +--
 .../core/{plugin_db_migration.go => basic_res.go}  |  11 +-
 plugins/core/dal/dal.go                            |  35 +--
 ...{plugin_db_migration.go => plugin_migration.go} |  25 ++-
 plugins/core/plugin_task.go                        |  10 -
 plugins/customize/api/api.go                       |   2 +-
 plugins/github/e2e/pr_commit_test.go               |   1 +
 plugins/github/impl/impl.go                        |   3 +-
 plugins/helper/batch_save.go                       |  14 +-
 plugins/helper/default_task_context.go             |   5 +-
 runner/directrun.go                                |  11 +-
 runner/loader.go                                   |   3 -
 runner/migration.go                                |  29 ++-
 scripts/pm/gitlab/pipelines-list.sh                |  25 +++
 services/init.go                                   |  49 +++--
 52 files changed, 1246 insertions(+), 1132 deletions(-)

diff --git a/.env.example b/.env.example
index 42cbdd9a..911862b7 100644
--- a/.env.example
+++ b/.env.example
@@ -29,6 +29,7 @@ TEMPORAL_TASK_QUEUE=
 LOGGING_LEVEL=
 LOGGING_DIR=
 ENABLE_STACKTRACE=false
+FORCE_MIGRATION=false
 
 ##########################
 # Sensitive information encryption key
@@ -38,4 +39,4 @@ ENCODE_KEY=
 ##########################
 # Set if skip verify and connect with out trusted certificate when use https
 ##########################
-IN_SECURE_SKIP_VERIFY=
\ No newline at end of file
+IN_SECURE_SKIP_VERIFY=
diff --git a/helpers/migrationhelper/helper.go 
b/helpers/migrationhelper/helper.go
deleted file mode 100644
index 2191a1c9..00000000
--- a/helpers/migrationhelper/helper.go
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package migrationhelper
-
-import (
-       "fmt"
-       "reflect"
-
-       "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
-       "github.com/apache/incubator-devlake/plugins/gitlab/api"
-       "github.com/apache/incubator-devlake/plugins/helper"
-       "gorm.io/gorm/schema"
-)
-
-// TransformRowsInPlace method can be used when we need to change the table 
structure and reprocess all the data in the table.
-func TransformRowsInPlace(db dal.Dal, src schema.Tabler, bak schema.Tabler, 
dst schema.Tabler, callback_transform func(src schema.Tabler) schema.Tabler) 
(errs errors.Error) {
-       var err error
-
-       // param cheking
-       errs = paramCheckingForTransformRowsInPlace(db, src, bak, dst, 
callback_transform)
-       if errs != nil {
-               return errors.Default.Wrap(errs, "TransformRowsInPlace param 
cheking error")
-       }
-
-       // rename the src to bak for cache src table
-       err = db.RenameTable(src, bak)
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error no rename 
[%s] to [%s]", src.TableName(), bak.TableName()))
-       }
-
-       // rollback for rename back
-       defer func() {
-               if errs != nil {
-                       err = db.RenameTable(bak, src)
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table [%s] , you must to rollback by yourself. 
%s", bak.TableName(), err.Error()))
-                       }
-               }
-       }()
-
-       return TransformRowsBetweenTables(db, bak, dst, callback_transform)
-}
-
-// TransformRowsBetweenTables method can be used when we need to change the 
table structure and reprocess all the data in the table.
-// It request the src table and the dst table with different table name.
-func TransformRowsBetweenTables(db dal.Dal, src schema.Tabler, dst 
schema.Tabler, callback_transform func(src schema.Tabler) schema.Tabler) (errs 
errors.Error) {
-       var err error
-
-       errs = paramCheckingForTransformRowsBetweenTables(db, src, dst, 
callback_transform)
-       if errs != nil {
-               return errors.Default.Wrap(errs, "TransformRowsBetweenTables 
param cheking error")
-       }
-
-       // create new commit_files table
-       err = db.AutoMigrate(dst)
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error on auto 
migrate [%s]", dst.TableName()))
-       }
-
-       // rollback for create new table
-       defer func() {
-               if errs != nil {
-                       err = db.DropTable(dst)
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table [%s] , you must to rollback by yourself. 
%s", dst.TableName(), err.Error()))
-                       }
-               }
-       }()
-
-       // update src id to dst id and write to the dst table
-       cursor, err := db.Cursor(
-               dal.From(src.TableName()),
-       )
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error on select 
[%s]]", src.TableName()))
-       }
-       defer cursor.Close()
-
-       // caculate and save the data to new table
-       batch, err := helper.NewBatchSave(api.BasicRes, reflect.TypeOf(dst), 
200)
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error getting 
batch from table [%s]", dst.TableName()))
-       }
-
-       defer batch.Close()
-       for cursor.Next() {
-               err = db.Fetch(cursor, src)
-               if err != nil {
-                       return errors.Default.Wrap(err, fmt.Sprintf("error scan 
rows from table [%s]", src.TableName()))
-               }
-
-               cf := callback_transform(src)
-
-               err = batch.Add(&cf)
-               if err != nil {
-                       return errors.Default.Wrap(err, fmt.Sprintf("error on 
[%s] batch add", dst.TableName()))
-               }
-       }
-
-       // drop the src table
-       err = db.DropTable(src)
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error no drop 
[%s]", src.TableName()))
-       }
-
-       return nil
-}
-
-// paramCheckingForTransformRowsInPlace check the params of 
TransformRowsInPlace
-func paramCheckingForTransformRowsInPlace(db dal.Dal, src schema.Tabler, bak 
schema.Tabler, dst schema.Tabler, callback_transform func(src schema.Tabler) 
schema.Tabler) (errs errors.Error) {
-       errs = paramCheckingShare(db, dst, callback_transform)
-       if errs != nil {
-               return errs
-       }
-
-       if src == nil {
-               return errors.Default.New("can not working with param src nil")
-       }
-
-       if bak == nil {
-               return errors.Default.New("can not working with param bak nil")
-       }
-
-       if dst.TableName() == bak.TableName() {
-               return errors.Default.New(fmt.Sprintf("the bak and dst can not 
use the same table name [%s][%s].",
-                       bak.TableName(), dst.TableName()))
-       }
-
-       if src.TableName() == bak.TableName() {
-               return errors.Default.New(fmt.Sprintf("the src and bak can not 
use the same table name [%s][%s].",
-                       src.TableName(), bak.TableName()))
-       }
-
-       return nil
-}
-
-// paramCheckingForTransformRowsBetweenTables check the params of 
ReBuildTableWithOutBak
-func paramCheckingForTransformRowsBetweenTables(db dal.Dal, src schema.Tabler, 
dst schema.Tabler, callback_transform func(src schema.Tabler) schema.Tabler) 
(errs errors.Error) {
-       errs = paramCheckingShare(db, dst, callback_transform)
-       if errs != nil {
-               return errs
-       }
-
-       if src == nil {
-               return errors.Default.New("can not working with param src nil")
-       }
-
-       if src.TableName() == dst.TableName() {
-               return errors.Default.New(fmt.Sprintf("src and dst can not use 
the same table name [%s][%s].",
-                       src.TableName(), dst.TableName()))
-       }
-
-       return nil
-}
-
-// paramCheckingShare check the Share part params of 
TransformRowsBetweenTables and TransformRowsInPlace
-func paramCheckingShare(db dal.Dal, dst schema.Tabler, callback_transform 
func(src schema.Tabler) schema.Tabler) (errs errors.Error) {
-       if db == nil {
-               return errors.Default.New("can not working with param db nil")
-       }
-
-       if dst == nil {
-               return errors.Default.New("can not working with param dst nil")
-       }
-
-       if callback_transform == nil {
-               return errors.Default.New("can not working with param 
callback_transform nil")
-       }
-
-       return nil
-}
diff --git a/helpers/migrationhelper/migrationhelper.go 
b/helpers/migrationhelper/migrationhelper.go
new file mode 100644
index 00000000..f8f67acb
--- /dev/null
+++ b/helpers/migrationhelper/migrationhelper.go
@@ -0,0 +1,138 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationhelper
+
+import (
+       "crypto/md5"
+       "encoding/hex"
+       "fmt"
+       "reflect"
+       "strings"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/apache/incubator-devlake/plugins/helper"
+)
+
+// AutoMigrateTables runs AutoMigrate for muliple tables
+func AutoMigrateTables(basicRes core.BasicRes, dst ...interface{}) 
errors.Error {
+       db := basicRes.GetDal()
+       for _, entity := range dst {
+               err := db.AutoMigrate(entity)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// TransformTable can be used when we need to change the table structure and 
reprocess all the data in the table.
+func TransformTable[S any, D any](
+       basicRes core.BasicRes,
+       script core.MigrationScript,
+       tableName string,
+       transform func(*S) (*D, errors.Error),
+) (err errors.Error) {
+       db := basicRes.GetDal()
+       tmpTableName := fmt.Sprintf("%s_%s", tableName, hashScript(script))
+
+       // rename the src to tmp in case of failure
+       err = db.RenameTable(tableName, tmpTableName)
+       if err != nil {
+               return errors.Default.Wrap(
+                       err,
+                       fmt.Sprintf("failed to rename rename src table [%s] to 
[%s]", tableName, tmpTableName),
+               )
+       }
+       // rollback for error
+       defer func() {
+               if err != nil {
+                       err = db.RenameTable(tmpTableName, tableName)
+                       if err != nil {
+                               msg := fmt.Sprintf(
+                                       "fail to rollback table [%s] to [%s], 
you may have to do it manually",
+                                       tmpTableName,
+                                       tableName,
+                               )
+                               err = errors.Default.Wrap(err, msg)
+                       }
+               }
+       }()
+
+       // create new table with the same name
+       err = db.AutoMigrate(new(D), dal.From(tableName))
+       if err != nil {
+               return errors.Default.Wrap(err, fmt.Sprintf("error on auto 
migrate [%s]", tableName))
+       }
+       // rollback for error
+       defer func() {
+               if err != nil {
+                       err = db.DropTables(tableName)
+                       if err != nil {
+                               msg := fmt.Sprintf(
+                                       "fail to drop table [%s], you may have 
to do it manually",
+                                       tableName,
+                               )
+                               err = errors.Default.Wrap(err, msg)
+                       }
+               }
+       }()
+
+       // transform data from temp table to new table
+       cursor, err := db.Cursor(dal.From(tmpTableName))
+       if err != nil {
+               return errors.Default.Wrap(err, fmt.Sprintf("failed to load 
data from src table [%s]", tmpTableName))
+       }
+       defer cursor.Close()
+       batch, err := helper.NewBatchSave(basicRes, reflect.TypeOf(new(D)), 
200, tableName)
+       if err != nil {
+               return errors.Default.Wrap(err, fmt.Sprintf("failed to 
instantiate BatchSave for table [%s]", tableName))
+       }
+       defer batch.Close()
+       src := new(S)
+       for cursor.Next() {
+               err = db.Fetch(cursor, src)
+               if err != nil {
+                       return errors.Default.Wrap(err, fmt.Sprintf("fail to 
load record from table [%s]", tmpTableName))
+               }
+               dst, err := transform(src)
+               if err != nil {
+                       return errors.Default.Wrap(err, fmt.Sprintf("failed to 
transform row %v", src))
+               }
+               err = batch.Add(dst)
+               if err != nil {
+                       return errors.Default.Wrap(err, fmt.Sprintf("push to 
BatchSave failed %v", dst))
+               }
+       }
+
+       // drop the temp table: we can safely ignore the error because it 
doesn't matter and there will be nothing we
+       // can do in terms of rollback or anything in that nature.
+       _ = db.DropTables(tmpTableName)
+
+       return err
+}
+
+func hashScript(script core.MigrationScript) string {
+       hasher := md5.New()
+       _, err := hasher.Write([]byte(fmt.Sprintf("%s:%v", script.Name(), 
script.Version())))
+       if err != nil {
+               panic(err)
+       }
+       return strings.ToUpper(hex.EncodeToString(hasher.Sum(nil)))
+}
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 003c565f..1ffb3e27 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/incubator-devlake/utils"
        "gorm.io/gorm"
        "gorm.io/gorm/clause"
-       "gorm.io/gorm/schema"
 )
 
 // Dalgorm FIXME ...
@@ -82,7 +81,12 @@ func (d *Dalgorm) Exec(query string, params ...interface{}) 
errors.Error {
 
 // AutoMigrate runs auto migration for given models
 func (d *Dalgorm) AutoMigrate(entity interface{}, clauses ...dal.Clause) 
errors.Error {
-       return errors.Convert(buildTx(d.db, clauses).AutoMigrate(entity))
+       err := errors.Convert(buildTx(d.db, clauses).AutoMigrate(entity))
+       if err == nil {
+               // fix pg cache plan error
+               _ = d.db.Limit(1).Find(entity)
+       }
+       return err
 }
 
 // Cursor returns a database cursor, cursor is especially useful when handling 
big amount of rows of data
@@ -147,13 +151,21 @@ func (d *Dalgorm) Delete(entity interface{}, clauses 
...dal.Clause) errors.Error
        return errors.Convert(buildTx(d.db, clauses).Delete(entity).Error)
 }
 
-// UpdateColumns batch records in database
+// UpdateColumn allows you to update mulitple records
+func (d *Dalgorm) UpdateColumn(entity interface{}, columnName string, value 
interface{}, clauses ...dal.Clause) errors.Error {
+       if expr, ok := value.(dal.DalClause); ok {
+               value = gorm.Expr(expr.Expr, expr.Params...)
+       }
+       return errors.Convert(buildTx(d.db, 
clauses).Model(entity).Update(columnName, value).Error)
+}
+
+// UpdateColumns allows you to update multiple columns of mulitple records
 func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause) 
errors.Error {
        return errors.Convert(buildTx(d.db, 
clauses).UpdateColumns(entity).Error)
 }
 
 // GetColumns FIXME ...
-func (d *Dalgorm) GetColumns(dst schema.Tabler, filter func(columnMeta 
dal.ColumnMeta) bool) (cms []dal.ColumnMeta, _ errors.Error) {
+func (d *Dalgorm) GetColumns(dst dal.Tabler, filter func(columnMeta 
dal.ColumnMeta) bool) (cms []dal.ColumnMeta, _ errors.Error) {
        columnTypes, err := d.db.Migrator().ColumnTypes(dst.TableName())
        if err != nil {
                return nil, errors.Convert(err)
@@ -178,14 +190,21 @@ func (d *Dalgorm) AddColumn(table, columnName, columnType 
string) errors.Error {
        return d.Exec("ALTER TABLE ? ADD ? ?", clause.Table{Name: table}, 
clause.Column{Name: columnName}, clause.Expr{SQL: columnType})
 }
 
-// DropColumn drop one column from the table
-func (d *Dalgorm) DropColumn(table, columnName string) errors.Error {
+// DropColumns drop one column from the table
+func (d *Dalgorm) DropColumns(table string, columnNames ...string) 
errors.Error {
        // work around the error `cached plan must not change result type` for 
postgres
        // wrap in func(){} to make the linter happy
        defer func() {
                _ = d.Exec("SELECT * FROM ? LIMIT 1", clause.Table{Name: table})
        }()
-       return d.Exec("ALTER TABLE ? DROP COLUMN ?", clause.Table{Name: table}, 
clause.Column{Name: columnName})
+       for _, columnName := range columnNames {
+               err := d.Exec("ALTER TABLE ? DROP COLUMN ?", clause.Table{Name: 
table}, clause.Column{Name: columnName})
+               // err := d.db.Migrator().DropColumn(table, columnName)
+               if err != nil {
+                       return errors.Convert(err)
+               }
+       }
+       return nil
 }
 
 // GetPrimaryKeyFields get the PrimaryKey from `gorm` tag
@@ -195,6 +214,21 @@ func (d *Dalgorm) GetPrimaryKeyFields(t reflect.Type) 
[]reflect.StructField {
        })
 }
 
+// RenameColumn renames column name for specified table
+func (d *Dalgorm) RenameColumn(table, oldColumnName, newColumnName string) 
errors.Error {
+       // work around the error `cached plan must not change result type` for 
postgres
+       // wrap in func(){} to make the linter happy
+       defer func() {
+               _ = d.Exec("SELECT * FROM ? LIMIT 1", clause.Table{Name: table})
+       }()
+       return d.Exec(
+               "ALTER TABLE ? RENAME COLUMN ? TO ?",
+               clause.Table{Name: table},
+               clause.Column{Name: oldColumnName},
+               clause.Column{Name: newColumnName},
+       )
+}
+
 // AllTables returns all tables in the database
 func (d *Dalgorm) AllTables() ([]string, errors.Error) {
        var tableSql string
@@ -217,25 +251,28 @@ func (d *Dalgorm) AllTables() ([]string, errors.Error) {
        return filteredTables, nil
 }
 
-// NewDalgorm FIXME ...
-func NewDalgorm(db *gorm.DB) *Dalgorm {
-       return &Dalgorm{db}
+// DropTables drop multiple tables by Model Pointer or Table Name
+func (d *Dalgorm) DropTables(dst ...interface{}) errors.Error {
+       return errors.Convert(d.db.Migrator().DropTable(dst...))
 }
 
-// RenameTable rename the oldName table to newName
-func (d *Dalgorm) RenameTable(oldName interface{}, newName interface{}) 
errors.Error {
-       err := d.db.Migrator().RenameTable(oldName, newName)
-       if err != nil {
-               return errors.Default.New(err.Error())
-       }
-       return nil
+// RenameTable renames table name
+func (d *Dalgorm) RenameTable(oldName, newName string) errors.Error {
+       return errors.Convert(d.db.Migrator().RenameTable(oldName, newName))
 }
 
-// DropTable drop the table
-func (d *Dalgorm) DropTable(dst ...interface{}) errors.Error {
-       err := d.db.Migrator().DropTable(dst)
-       if err != nil {
-               return errors.Default.New(err.Error())
+// DropIndexes drops indexes for specified table
+func (d *Dalgorm) DropIndexes(table string, indexNames ...string) errors.Error 
{
+       for _, indexName := range indexNames {
+               err := d.db.Migrator().DropIndex(table, indexName)
+               if err != nil {
+                       return errors.Convert(err)
+               }
        }
        return nil
 }
+
+// NewDalgorm FIXME ...
+func NewDalgorm(db *gorm.DB) *Dalgorm {
+       return &Dalgorm{db}
+}
diff --git a/impl/default_basic_res.go b/impl/default_basic_res.go
new file mode 100644
index 00000000..8b523333
--- /dev/null
+++ b/impl/default_basic_res.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package impl
+
+import (
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/spf13/viper"
+)
+
+// DefaultBasicRes offers a common implementation for the  BasisRes interface
+type DefaultBasicRes struct {
+       cfg    *viper.Viper
+       logger core.Logger
+       db     dal.Dal
+}
+
+// GetConfig returns the value of the specificed name
+func (c *DefaultBasicRes) GetConfig(name string) string {
+       return c.cfg.GetString(name)
+}
+
+// GetDal returns the Dal instance
+func (c *DefaultBasicRes) GetDal() dal.Dal {
+       return c.db
+}
+
+// GetLogger returns the Logger instance
+func (c *DefaultBasicRes) GetLogger() core.Logger {
+       return c.logger
+}
+
+// NewDefaultBasicRes creates a new DefaultBasicRes instance
+func NewDefaultBasicRes(
+       cfg *viper.Viper,
+       logger core.Logger,
+       db dal.Dal,
+) *DefaultBasicRes {
+       return &DefaultBasicRes{
+               cfg:    cfg,
+               logger: logger,
+               db:     db,
+       }
+}
diff --git a/impl/migration/migrator.go b/impl/migration/migrator.go
new file mode 100644
index 00000000..3b75943c
--- /dev/null
+++ b/impl/migration/migrator.go
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migration
+
+import (
+       "fmt"
+       "sort"
+       "sync"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
+)
+
+type scriptWithComment struct {
+       script  core.MigrationScript
+       comment string
+}
+
+type migratorImpl struct {
+       sync.Mutex
+       basicRes core.BasicRes
+       executed map[string]bool
+       scripts  []*scriptWithComment
+       pending  []*scriptWithComment
+}
+
+func (m *migratorImpl) loadExecuted() errors.Error {
+       db := m.basicRes.GetDal()
+       // make sure migration_history table exists
+       err := db.AutoMigrate(&MigrationHistory{})
+       if err != nil {
+               return errors.Default.Wrap(err, "error performing migrations")
+       }
+       // load executed scripts into memory
+       m.executed = make(map[string]bool)
+       var records []MigrationHistory
+       err = db.All(&records)
+       if err != nil {
+               return errors.Default.Wrap(err, "error finding migration 
history records")
+       }
+       for _, record := range records {
+               m.executed[fmt.Sprintf("%s:%d", record.ScriptName, 
record.ScriptVersion)] = true
+       }
+       return nil
+}
+
+// Register a MigrationScript to the Migrator with comment message
+func (m *migratorImpl) Register(scripts []core.MigrationScript, comment 
string) {
+       m.Lock()
+       defer m.Unlock()
+       for _, script := range scripts {
+               key := fmt.Sprintf("%s:%d", script.Name(), script.Version())
+               swc := &scriptWithComment{
+                       script:  script,
+                       comment: comment,
+               }
+               m.scripts = append(m.scripts, swc)
+               if !m.executed[key] {
+                       m.pending = append(m.pending, swc)
+               }
+       }
+}
+
+// Execute all registered migration script in order and mark them as executed 
in migration_history table
+func (m *migratorImpl) Execute() errors.Error {
+       // sort the scripts by version
+       sort.Slice(m.pending, func(i, j int) bool {
+               return m.pending[i].script.Version() < 
m.pending[j].script.Version()
+       })
+       // execute them one by one
+       db := m.basicRes.GetDal()
+       log := m.basicRes.GetLogger().Nested("migrator")
+       for _, swc := range m.pending {
+               scriptId := fmt.Sprintf("%d-%s", swc.script.Version(), 
swc.script.Name())
+               log.Info("applying migratin script %s", scriptId)
+               err := swc.script.Up(m.basicRes)
+               if err != nil {
+                       return err
+               }
+               err = db.Create(&MigrationHistory{
+                       ScriptVersion: swc.script.Version(),
+                       ScriptName:    swc.script.Name(),
+                       Comment:       swc.comment,
+               })
+               if err != nil {
+                       return errors.Default.Wrap(err, fmt.Sprintf("failed to 
execute migration script %s", scriptId))
+               }
+               m.executed[scriptId] = true
+               m.pending = m.pending[1:]
+       }
+       return nil
+}
+
+// HasPendingScripts returns if there is any pending migration scripts
+func (m *migratorImpl) HasPendingScripts() bool {
+       return len(m.executed) > 0 && len(m.pending) > 0
+}
+
+// NewMigrator returns a new Migrator instance, which
+// implemented based on migration_history from the same database
+func NewMigrator(basicRes core.BasicRes) (core.Migrator, errors.Error) {
+       m := &migratorImpl{
+               basicRes: basicRes,
+       }
+       err := m.loadExecuted()
+       if err != nil {
+               return nil, err
+       }
+       return m, nil
+}
diff --git a/impl/migration/migrator_test.go b/impl/migration/migrator_test.go
new file mode 100644
index 00000000..85091f22
--- /dev/null
+++ b/impl/migration/migrator_test.go
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migration
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/unithelper"
+       "github.com/apache/incubator-devlake/impl"
+       "github.com/apache/incubator-devlake/mocks"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/spf13/viper"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+)
+
+func TestHasPendingScripts(t *testing.T) {
+       // simulate db reaction
+       mockDal := new(mocks.Dal)
+       mockDal.On("AutoMigrate", mock.Anything, 
mock.Anything).Return(nil).Once()
+       mockDal.On("All", mock.Anything, mock.Anything).Return(func(i 
interface{}, _ ...dal.Clause) errors.Error {
+               precords := i.(*[]MigrationHistory)
+               *precords = []MigrationHistory{
+                       {ScriptName: "A", ScriptVersion: 1, Comment: "UniTest", 
CreatedAt: time.Now()},
+                       {ScriptName: "B", ScriptVersion: 2, Comment: "UniTest", 
CreatedAt: time.Now()},
+                       {ScriptName: "C", ScriptVersion: 3, Comment: "UniTest", 
CreatedAt: time.Now()},
+               }
+               return nil
+       }).Once()
+       mockDal.On("Create", &MigrationHistory{
+               ScriptName:    "E",
+               ScriptVersion: 4,
+               Comment:       "UnitTest",
+       }, mock.Anything).Return(nil).Once()
+       mockDal.On("Create", &MigrationHistory{
+               ScriptName:    "D",
+               ScriptVersion: 5,
+               Comment:       "UnitTest",
+       }, mock.Anything).Return(nil).Once()
+
+       // migrator initialization
+       basicRes := impl.NewDefaultBasicRes(viper.New(), 
unithelper.DummyLogger(), mockDal)
+       migrator, err := NewMigrator(basicRes)
+       assert.Nil(t, err)
+
+       // assuming we have 2 new scripts
+       scriptD := new(mocks.MigrationScript)
+       scriptD.On("Up", mock.Anything).Return(nil).Once()
+       scriptD.On("Version").Return(uint64(5))
+       scriptD.On("Name").Return("D")
+       scriptE := new(mocks.MigrationScript)
+       scriptE.On("Up", mock.Anything).Return(nil).Once()
+       scriptE.On("Version").Return(uint64(4))
+       scriptE.On("Name").Return("E")
+       migrator.Register([]core.MigrationScript{scriptD, scriptE}, "UnitTest")
+
+       // we should have pending scripts
+       assert.True(t, migrator.HasPendingScripts())
+
+       // lets try migrating
+       assert.Nil(t, migrator.Execute())
+
+       // should not be any pending scripts anymore
+       assert.False(t, migrator.HasPendingScripts())
+
+       // make sure all method got called
+       mockDal.AssertExpectations(t)
+}
diff --git a/plugins/core/plugin_db_migration.go b/impl/migration/models.go
similarity index 72%
copy from plugins/core/plugin_db_migration.go
copy to impl/migration/models.go
index 005aca3d..b15bf2f8 100644
--- a/plugins/core/plugin_db_migration.go
+++ b/impl/migration/models.go
@@ -15,10 +15,17 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package core
+package migration
 
-import "github.com/apache/incubator-devlake/migration"
+import "time"
 
-type Migratable interface {
-       MigrationScripts() []migration.Script
+type MigrationHistory struct {
+       CreatedAt     time.Time
+       ScriptVersion uint64 `gorm:"primarykey"`
+       ScriptName    string `gorm:"primarykey;type:varchar(255)"`
+       Comment       string
+}
+
+func (MigrationHistory) TableName() string {
+       return "_devlake_migration_history"
 }
diff --git a/models/migrationscripts/20220406_add_frame_tables.go 
b/models/migrationscripts/20220406_add_frame_tables.go
index 214fabb9..3e014b54 100644
--- a/models/migrationscripts/20220406_add_frame_tables.go
+++ b/models/migrationscripts/20220406_add_frame_tables.go
@@ -18,32 +18,35 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
 
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
-type addFrameTables struct{}
+var _ core.MigrationScript = (*addFrameworkTables)(nil)
 
-func (*addFrameTables) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().AutoMigrate(
+type addFrameworkTables struct{}
+
+func (*addFrameworkTables) Up(basicRes core.BasicRes) errors.Error {
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
                &archived.Task{},
                &archived.Notification{},
                &archived.Pipeline{},
                &archived.Blueprint{},
-       ))
+       )
 }
 
-func (*addFrameTables) Version() uint64 {
+func (*addFrameworkTables) Version() uint64 {
        return 20220406212344
 }
 
-func (*addFrameTables) Owner() string {
+func (*addFrameworkTables) Owner() string {
        return "Framework"
 }
 
-func (*addFrameTables) Name() string {
+func (*addFrameworkTables) Name() string {
        return "create init schemas"
 }
diff --git a/models/migrationscripts/20220505_rename_step_to_stage.go 
b/models/migrationscripts/20220505_rename_pipeline_step_to_stage.go
similarity index 68%
rename from models/migrationscripts/20220505_rename_step_to_stage.go
rename to models/migrationscripts/20220505_rename_pipeline_step_to_stage.go
index 54617264..79d10616 100644
--- a/models/migrationscripts/20220505_rename_step_to_stage.go
+++ b/models/migrationscripts/20220505_rename_pipeline_step_to_stage.go
@@ -18,22 +18,22 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type renameStepToStage struct{}
+var _ core.MigrationScript = (*renamePipelineStepToStage)(nil)
 
-func (*renameStepToStage) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().RenameColumn(archived.Pipeline{}, 
"step", "stage"))
+type renamePipelineStepToStage struct{}
+
+func (*renamePipelineStepToStage) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().RenameColumn("_devlake_pipelines", "step", 
"stage")
 }
 
-func (*renameStepToStage) Version() uint64 {
+func (*renamePipelineStepToStage) Version() uint64 {
        return 20220505212344
 }
 
-func (*renameStepToStage) Name() string {
+func (*renamePipelineStepToStage) Name() string {
        return "Rename step to stage "
 }
diff --git a/models/migrationscripts/20220601_add_subtasks_field.go 
b/models/migrationscripts/20220601_add_subtasks_field.go
deleted file mode 100644
index e49ee2b7..00000000
--- a/models/migrationscripts/20220601_add_subtasks_field.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package migrationscripts
-
-import (
-       "context"
-       "github.com/apache/incubator-devlake/errors"
-       "time"
-
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/datatypes"
-       "gorm.io/gorm"
-)
-
-type Task20220601 struct {
-       archived.Model
-       Plugin        string         `json:"plugin" gorm:"index"`
-       Subtasks      datatypes.JSON `json:"subtasks"`
-       Options       datatypes.JSON `json:"options"`
-       Status        string         `json:"status"`
-       Message       string         `json:"message"`
-       Progress      float32        `json:"progress"`
-       FailedSubTask string         `json:"failedSubTask"`
-       PipelineId    uint64         `json:"pipelineId" gorm:"index"`
-       PipelineRow   int            `json:"pipelineRow"`
-       PipelineCol   int            `json:"pipelineCol"`
-       BeganAt       *time.Time     `json:"beganAt"`
-       FinishedAt    *time.Time     `json:"finishedAt" gorm:"index"`
-       SpentSeconds  int            `json:"spentSeconds"`
-}
-
-func (Task20220601) TableName() string {
-       return "_devlake_tasks"
-}
-
-type addSubtasksField struct{}
-
-func (*addSubtasksField) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().AddColumn(Task20220601{}, 
"subtasks"))
-}
-
-func (*addSubtasksField) Version() uint64 {
-       return 20220601000005
-}
-
-func (*addSubtasksField) Name() string {
-       return "add column `subtasks` at _devlake_tasks"
-}
diff --git a/models/migrationscripts/20220727_remove_notes.go 
b/models/migrationscripts/20220601_add_subtasks_to_task_table.go
similarity index 59%
copy from models/migrationscripts/20220727_remove_notes.go
copy to models/migrationscripts/20220601_add_subtasks_to_task_table.go
index e73ca331..e5253041 100644
--- a/models/migrationscripts/20220727_remove_notes.go
+++ b/models/migrationscripts/20220601_add_subtasks_to_task_table.go
@@ -18,22 +18,32 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
+       "encoding/json"
+
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type removeNotes struct{}
+var _ core.MigrationScript = (*addSubtaskToTaskTable)(nil)
+
+type tasks20220601 struct {
+       Subtasks json.RawMessage `json:"subtasks"`
+}
+
+func (tasks20220601) TableName() string {
+       return "_devlake_tasks"
+}
+
+type addSubtaskToTaskTable struct{}
 
-func (*removeNotes) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().DropTable(archived.Note{}))
+func (*addSubtaskToTaskTable) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&tasks20220601{})
 }
 
-func (*removeNotes) Version() uint64 {
-       return 20220727165805
+func (*addSubtaskToTaskTable) Version() uint64 {
+       return 20220601000005
 }
 
-func (*removeNotes) Name() string {
-       return "remove notes from domain layer"
+func (*addSubtaskToTaskTable) Name() string {
+       return "add column `subtasks` at _devlake_tasks"
 }
diff --git a/models/migrationscripts/20220616_update_blueprint_mode.go 
b/models/migrationscripts/20220616_add_blueprint_mode.go
similarity index 59%
rename from models/migrationscripts/20220616_update_blueprint_mode.go
rename to models/migrationscripts/20220616_add_blueprint_mode.go
index 2121efd1..3d7cd319 100644
--- a/models/migrationscripts/20220616_update_blueprint_mode.go
+++ b/models/migrationscripts/20220616_add_blueprint_mode.go
@@ -18,37 +18,45 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
-type Blueprint20220616 struct {
+var _ core.MigrationScript = (*addBlueprintMode)(nil)
+
+type blueprint20220616 struct {
        Mode     string `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
        IsManual bool   `json:"isManual"`
 }
 
-func (Blueprint20220616) TableName() string {
+func (blueprint20220616) TableName() string {
        return "_devlake_blueprints"
 }
 
-type updateBlueprintMode struct{}
+type addBlueprintMode struct{}
 
-func (*updateBlueprintMode) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(&Blueprint20220616{})
+func (*addBlueprintMode) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.AutoMigrate(&blueprint20220616{})
+       if err != nil {
+               return err
+       }
+       err = db.UpdateColumn(&blueprint20220616{}, "mode", "ADVANCED", 
dal.Where("mode is null"))
+       if err != nil {
+               return err
+       }
+       err = db.UpdateColumn(&blueprint20220616{}, "is_manual", false, 
dal.Where("is_manual is null"))
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       db.Model(&Blueprint20220616{}).Where("mode is null").Update("mode", 
"ADVANCED")
-       db.Model(&Blueprint20220616{}).Where("is_manual is 
null").Update("is_manual", false)
        return nil
 }
 
-func (*updateBlueprintMode) Version() uint64 {
+func (*addBlueprintMode) Version() uint64 {
        return 20220616110537
 }
 
-func (*updateBlueprintMode) Name() string {
+func (*addBlueprintMode) Name() string {
        return "add mode field to blueprint"
 }
diff --git a/models/migrationscripts/20220622_rename_tasks_to_plan.go 
b/models/migrationscripts/20220622_rename_tasks_to_plan.go
index b5b8dc7f..36f215c6 100644
--- a/models/migrationscripts/20220622_rename_tasks_to_plan.go
+++ b/models/migrationscripts/20220622_rename_tasks_to_plan.go
@@ -18,45 +18,45 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-       "github.com/apache/incubator-devlake/errors"
+       "encoding/json"
 
-       "gorm.io/datatypes"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-// model blueprint
-type blueprintNormalMode_Blueprint struct {
-       Settings datatypes.JSON `json:"settings"`
+var _ core.MigrationScript = (*renameTasksToPlan)(nil)
+
+type blueprint20220622 struct {
+       Settings json.RawMessage `json:"settings"`
 }
 
-func (blueprintNormalMode_Blueprint) TableName() string {
+func (blueprint20220622) TableName() string {
        return "_devlake_blueprints"
 }
 
-// model pipeline
-type blueprintNormalMode_Pipeline struct {
+type pipeline20220622 struct {
 }
 
-func (blueprintNormalMode_Pipeline) TableName() string {
+func (pipeline20220622) TableName() string {
        return "_devlake_pipelines"
 }
 
 // migration script
 type renameTasksToPlan struct{}
 
-func (*renameTasksToPlan) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AddColumn(blueprintNormalMode_Blueprint{}, 
"settings")
+func (*renameTasksToPlan) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.AutoMigrate(blueprint20220622{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().RenameColumn(&blueprintNormalMode_Blueprint{}, 
"tasks", "plan")
+       err = db.RenameColumn(blueprint20220622{}.TableName(), "tasks", "plan")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().RenameColumn(&blueprintNormalMode_Pipeline{}, 
"tasks", "plan")
+       err = db.RenameColumn(pipeline20220622{}.TableName(), "tasks", "plan")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        return nil
 }
diff --git a/models/migrationscripts/20220707_add_domain_tables.go 
b/models/migrationscripts/20220707_reset_domain_tables.go
similarity index 84%
rename from models/migrationscripts/20220707_add_domain_tables.go
rename to models/migrationscripts/20220707_reset_domain_tables.go
index 1c139f99..96376ae3 100644
--- a/models/migrationscripts/20220707_add_domain_tables.go
+++ b/models/migrationscripts/20220707_reset_domain_tables.go
@@ -18,17 +18,20 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
 
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
-type addDomainTables struct{}
+var _ core.MigrationScript = (*resetDomainTables)(nil)
 
-func (*addDomainTables) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().DropTable(
+type resetDomainTables struct{}
+
+func (*resetDomainTables) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.DropTables(
                "issue_assignee_history",
                "issue_status_history",
                "issue_sprints_history",
@@ -63,10 +66,11 @@ func (*addDomainTables) Up(ctx context.Context, db 
*gorm.DB) errors.Error {
                &archived.RefsPrCherrypick{},
        )
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
-       return errors.Convert(db.Migrator().AutoMigrate(
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
                &archived.Repo{},
                &archived.Commit{},
                &archived.CommitParent{},
@@ -100,17 +104,17 @@ func (*addDomainTables) Up(ctx context.Context, db 
*gorm.DB) errors.Error {
                &archived.Team{},
                &archived.UserAccount{},
                &archived.TeamUser{},
-       ))
+       )
 }
 
-func (*addDomainTables) Version() uint64 {
+func (*resetDomainTables) Version() uint64 {
        return 20220707232344
 }
 
-func (*addDomainTables) Owner() string {
+func (*resetDomainTables) Owner() string {
        return "Framework"
 }
 
-func (*addDomainTables) Name() string {
+func (*resetDomainTables) Name() string {
        return "create domain layer init schemas"
 }
diff --git a/models/migrationscripts/20220711_add_subtasks_table.go 
b/models/migrationscripts/20220711_add_subtasks_table.go
index 26ea54a7..eeffdbd3 100644
--- a/models/migrationscripts/20220711_add_subtasks_table.go
+++ b/models/migrationscripts/20220711_add_subtasks_table.go
@@ -18,18 +18,19 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
+       "time"
+
        "github.com/apache/incubator-devlake/errors"
        commonArchived 
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
-       "time"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*addSubtasksTable)(nil)
+
 type addSubtasksTable struct {
 }
 
-// Subtask20220711 DB snapshot model of models.Subtask
-type Subtask20220711 struct {
+type subtasks20220711 struct {
        commonArchived.Model
        TaskID       uint64     `json:"task_id" gorm:"index"`
        SubtaskName  string     `json:"name" gorm:"column:name;index"`
@@ -39,18 +40,18 @@ type Subtask20220711 struct {
        SpentSeconds int64      `json:"spentSeconds"`
 }
 
-func (s Subtask20220711) TableName() string {
+func (subtasks20220711) TableName() string {
        return "_devlake_subtasks"
 }
 
-func (u addSubtasksTable) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().AutoMigrate(&Subtask20220711{}))
+func (addSubtasksTable) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&subtasks20220711{})
 }
 
-func (u addSubtasksTable) Version() uint64 {
+func (addSubtasksTable) Version() uint64 {
        return 20220711000001
 }
 
-func (u addSubtasksTable) Name() string {
+func (addSubtasksTable) Name() string {
        return "create subtask schema"
 }
diff --git a/models/migrationscripts/20220722_commitfile_component.go 
b/models/migrationscripts/20220722_commitfile_component.go
index 544c8014..db22158d 100644
--- a/models/migrationscripts/20220722_commitfile_component.go
+++ b/models/migrationscripts/20220722_commitfile_component.go
@@ -18,24 +18,25 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type Component struct {
+var _ core.MigrationScript = (*addCommitFileComponent)(nil)
+
+type component20220722 struct {
        RepoId    string `gorm:"type:varchar(255)"`
        Name      string `gorm:"primaryKey;type:varchar(255)"`
        PathRegex string `gorm:"type:varchar(255)"`
 }
 
-func (Component) TableName() string {
+func (component20220722) TableName() string {
        return "components"
 }
 
-type CommitFile struct {
+type commitFile20220722 struct {
        archived.DomainEntity
        CommitSha string `gorm:"type:varchar(40)"`
        FilePath  string `gorm:"type:varchar(255)"`
@@ -43,40 +44,43 @@ type CommitFile struct {
        Deletions int
 }
 
-func (CommitFile) TableName() string {
+func (commitFile20220722) TableName() string {
        return "commit_files"
 }
 
-type CommitFileComponent struct {
+type commitFileComponent20220722 struct {
        archived.NoPKModel
        CommitFileId  string `gorm:"primaryKey;type:varchar(255)"`
        ComponentName string `gorm:"type:varchar(255)"`
 }
 
-func (CommitFileComponent) TableName() string {
+func (commitFileComponent20220722) TableName() string {
        return "commit_file_components"
 }
 
-type commitfileComponent struct{}
+type addCommitFileComponent struct{}
 
-func (*commitfileComponent) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().DropTable(&archived.CommitFile{})
+func (addCommitFileComponent) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.DropTables(&archived.CommitFile{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().AutoMigrate(Component{}, CommitFile{}, 
CommitFileComponent{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
+
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
+               component20220722{},
+               commitFile20220722{},
+               commitFileComponent20220722{},
+       )
 
 }
 
-func (*commitfileComponent) Version() uint64 {
+func (*addCommitFileComponent) Version() uint64 {
        return 20220722165805
 }
 
-func (*commitfileComponent) Name() string {
+func (*addCommitFileComponent) Name() string {
 
        return "add commit_file_components components table,update commit_files 
table"
 }
diff --git a/models/migrationscripts/20220725_add_project_mapping_table.go 
b/models/migrationscripts/20220725_add_project_mapping_table.go
index 7854f35a..880a1cd8 100644
--- a/models/migrationscripts/20220725_add_project_mapping_table.go
+++ b/models/migrationscripts/20220725_add_project_mapping_table.go
@@ -18,16 +18,17 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*addProjectMapping)(nil)
+
 type addProjectMapping struct{}
 
-func (*addProjectMapping) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return 
errors.Convert(db.Migrator().AutoMigrate(&archived.ProjectMapping{}))
+func (*addProjectMapping) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&archived.ProjectMapping{})
 }
 
 func (*addProjectMapping) Version() uint64 {
diff --git a/models/migrationscripts/20220727_remove_notes.go 
b/models/migrationscripts/20220727_remove_notes.go
index e73ca331..cb4e9ffc 100644
--- a/models/migrationscripts/20220727_remove_notes.go
+++ b/models/migrationscripts/20220727_remove_notes.go
@@ -18,16 +18,17 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*removeNotes)(nil)
+
 type removeNotes struct{}
 
-func (*removeNotes) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       return errors.Convert(db.Migrator().DropTable(archived.Note{}))
+func (*removeNotes) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().DropTables(archived.Note{})
 }
 
 func (*removeNotes) Version() uint64 {
diff --git 
a/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go 
b/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
index 9b04efc8..9b60aba5 100644
--- a/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
+++ b/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
@@ -18,23 +18,23 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type renameColumnsOfPullRequestIssue struct{}
+var _ core.MigrationScript = (*renameColumnsOfPullRequestIssue)(nil)
 
-func (*renameColumnsOfPullRequestIssue) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
+type renameColumnsOfPullRequestIssue struct{}
 
-       err := db.Migrator().RenameColumn(&archived.PullRequestIssue{}, 
"pull_request_number", "pull_request_key")
+func (*renameColumnsOfPullRequestIssue) Up(basicRes core.BasicRes) 
errors.Error {
+       db := basicRes.GetDal()
+       err := db.RenameColumn("pull_request_issues", "pull_request_number", 
"pull_request_key")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().RenameColumn(&archived.PullRequestIssue{}, 
"issue_number", "issue_key")
+       err = db.RenameColumn("pull_request_issues", "issue_number", 
"issue_key")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        return nil
 }
diff --git a/models/migrationscripts/20220801_add_NoPKModel_to_CommitParent.go 
b/models/migrationscripts/20220801_add_nopkmodel_to_commit_parent.go
similarity index 80%
rename from models/migrationscripts/20220801_add_NoPKModel_to_CommitParent.go
rename to models/migrationscripts/20220801_add_nopkmodel_to_commit_parent.go
index 760b0648..72e3e27f 100644
--- a/models/migrationscripts/20220801_add_NoPKModel_to_CommitParent.go
+++ b/models/migrationscripts/20220801_add_nopkmodel_to_commit_parent.go
@@ -18,27 +18,27 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type commitParent struct {
+var _ core.MigrationScript = (*addNoPKModelToCommitParent)(nil)
+
+type commitParent20220801 struct {
        archived.NoPKModel
        CommitSha       string `json:"commitSha" 
gorm:"primaryKey;type:varchar(40);comment:commit hash"`
        ParentCommitSha string `json:"parentCommitSha" 
gorm:"primaryKey;type:varchar(40);comment:parent commit hash"`
 }
 
-func (commitParent) TableName() string {
+func (commitParent20220801) TableName() string {
        return "commit_parents"
 }
 
 type addNoPKModelToCommitParent struct{}
 
-func (*addNoPKModelToCommitParent) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
-       return errors.Convert(db.Migrator().AutoMigrate(&commitParent{}))
+func (*addNoPKModelToCommitParent) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&commitParent20220801{})
 }
 
 func (*addNoPKModelToCommitParent) Version() uint64 {
diff --git a/models/migrationscripts/20220818_add_cicd.go 
b/models/migrationscripts/20220818_add_cicd_tables.go
similarity index 76%
rename from models/migrationscripts/20220818_add_cicd.go
rename to models/migrationscripts/20220818_add_cicd_tables.go
index c03cc39d..97eb873f 100644
--- a/models/migrationscripts/20220818_add_cicd.go
+++ b/models/migrationscripts/20220818_add_cicd_tables.go
@@ -18,34 +18,18 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-       "github.com/apache/incubator-devlake/errors"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
+
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
-type addCICD struct{}
-
-func (*addCICD) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(
-               &CICDPipelineRepo{},
-               &CICDPipeline{},
-               &CICDTask{},
-       )
-       return errors.Convert(err)
-}
-
-func (*addCICD) Version() uint64 {
-       return 20220818232735
-}
-
-func (*addCICD) Name() string {
-       return "add cicd models"
-}
+var _ core.MigrationScript = (*addCICDTables)(nil)
 
-type CICDPipeline struct {
+type cicdPipeline struct {
        archived.DomainEntity
        Name         string `gorm:"type:varchar(255)"`
        CommitSha    string `gorm:"type:varchar(255);index"`
@@ -59,11 +43,11 @@ type CICDPipeline struct {
        FinishedDate *time.Time
 }
 
-func (CICDPipeline) TableName() string {
+func (cicdPipeline) TableName() string {
        return "cicd_pipelines"
 }
 
-type CICDTask struct {
+type cicdTask struct {
        archived.DomainEntity
        Name         string `gorm:"type:varchar(255)"`
        PipelineId   string `gorm:"index;type:varchar(255)"`
@@ -75,17 +59,36 @@ type CICDTask struct {
        FinishedDate *time.Time
 }
 
-func (CICDTask) TableName() string {
+func (cicdTask) TableName() string {
        return "cicd_tasks"
 }
 
-type CICDPipelineRepo struct {
+type cicdPipelineRepo struct {
        archived.DomainEntity
        CommitSha string `gorm:"primaryKey;type:varchar(255)"`
        Branch    string `gorm:"type:varchar(255)"`
        RepoUrl   string `gorm:"type:varchar(255)"`
 }
 
-func (CICDPipelineRepo) TableName() string {
+func (cicdPipelineRepo) TableName() string {
        return "cicd_pipeline_repos"
 }
+
+type addCICDTables struct{}
+
+func (*addCICDTables) Up(basicRes core.BasicRes) errors.Error {
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
+               &cicdPipelineRepo{},
+               &cicdPipeline{},
+               &cicdTask{},
+       )
+}
+
+func (*addCICDTables) Version() uint64 {
+       return 20220818232735
+}
+
+func (*addCICDTables) Name() string {
+       return "add cicd models"
+}
diff --git 
a/models/migrationscripts/20220826_rename_columns_of_pr_comment_issue_comment.go
 
b/models/migrationscripts/20220826_rename_columns_of_pr_comment_issue_comment.go
index 432a071f..d28e95be 100644
--- 
a/models/migrationscripts/20220826_rename_columns_of_pr_comment_issue_comment.go
+++ 
b/models/migrationscripts/20220826_rename_columns_of_pr_comment_issue_comment.go
@@ -18,24 +18,21 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*renameColumnsOfPrCommentIssueComment)(nil)
+
 type renameColumnsOfPrCommentIssueComment struct{}
 
-func (*renameColumnsOfPrCommentIssueComment) Up(ctx context.Context, db 
*gorm.DB) errors.Error {
-       err := db.Migrator().RenameColumn(&archived.PullRequestComment{}, 
"user_id", "account_id")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().RenameColumn(&archived.IssueComment{}, "user_id", 
"account_id")
+func (*renameColumnsOfPrCommentIssueComment) Up(basicRes core.BasicRes) 
errors.Error {
+       db := basicRes.GetDal()
+       err := db.RenameColumn("pull_request_comments", "user_id", "account_id")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       return nil
+       return db.RenameColumn("issue_comments", "user_id", "account_id")
 }
 
 func (*renameColumnsOfPrCommentIssueComment) Version() uint64 {
diff --git a/models/migrationscripts/20220829_modify_tables_for_dora.go 
b/models/migrationscripts/20220829_modify_tables_for_dora.go
index 23ffc91e..dcc5c90d 100644
--- a/models/migrationscripts/20220829_modify_tables_for_dora.go
+++ b/models/migrationscripts/20220829_modify_tables_for_dora.go
@@ -18,31 +18,14 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type modifyTablesForDora struct{}
-
-func (*modifyTablesForDora) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(
-               &CICDPipeline0829{},
-               &PullRequest0829{},
-               &Issue0829{},
-       )
-       return errors.Convert(err)
-}
+var _ core.MigrationScript = (*modifyTablesForDora)(nil)
 
-func (*modifyTablesForDora) Version() uint64 {
-       return 20220829232735
-}
-
-func (*modifyTablesForDora) Name() string {
-       return "modify tables for dora"
-}
-
-type PullRequest0829 struct {
+type pullRequest20220829 struct {
        CodingTimespan int64
        ReviewLag      int64
        ReviewTimespan int64
@@ -50,22 +33,41 @@ type PullRequest0829 struct {
        ChangeTimespan int64
 }
 
-func (PullRequest0829) TableName() string {
+func (pullRequest20220829) TableName() string {
        return "pull_requests"
 }
 
-type Issue0829 struct {
+type issue20220829 struct {
        DeploymentId string `gorm:"type:varchar(255)"`
 }
 
-func (Issue0829) TableName() string {
+func (issue20220829) TableName() string {
        return "issues"
 }
 
-type CICDPipeline0829 struct {
+type cicdPipeline20220829 struct {
        Environment string `gorm:"type:varchar(255)"`
 }
 
-func (CICDPipeline0829) TableName() string {
+func (cicdPipeline20220829) TableName() string {
        return "cicd_pipelines"
 }
+
+type modifyTablesForDora struct{}
+
+func (*modifyTablesForDora) Up(basicRes core.BasicRes) errors.Error {
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
+               &cicdPipeline20220829{},
+               &pullRequest20220829{},
+               &issue20220829{},
+       )
+}
+
+func (*modifyTablesForDora) Version() uint64 {
+       return 20220829232735
+}
+
+func (*modifyTablesForDora) Name() string {
+       return "modify tables for dora"
+}
diff --git a/models/migrationscripts/20220830_add_type_field_in_board.go 
b/models/migrationscripts/20220830_add_type_field_in_board.go
index 22855e1f..1a8b8e59 100644
--- a/models/migrationscripts/20220830_add_type_field_in_board.go
+++ b/models/migrationscripts/20220830_add_type_field_in_board.go
@@ -18,37 +18,30 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
-       "time"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type addFieldTask20220830 struct {
-       archived.DomainEntity
-       Name        string `gorm:"type:varchar(255)"`
-       Description string
-       Url         string `gorm:"type:varchar(255)"`
-       CreatedDate *time.Time
-       Type        string `gorm:"type:varchar(255)"`
+var _ core.MigrationScript = (*addTypeToBoard)(nil)
+
+type boards20220830 struct {
+       Type string `gorm:"type:varchar(255)"`
 }
 
-func (addFieldTask20220830) TableName() string {
+func (boards20220830) TableName() string {
        return "boards"
 }
 
-type addTypeFieldInBoard struct{}
+type addTypeToBoard struct{}
 
-func (*addTypeFieldInBoard) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AddColumn(addFieldTask20220830{}, "type")
-       return errors.Convert(err)
+func (*addTypeToBoard) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&boards20220830{})
 }
 
-func (*addTypeFieldInBoard) Version() uint64 {
+func (*addTypeToBoard) Version() uint64 {
        return 20220830142321
 }
 
-func (*addTypeFieldInBoard) Name() string {
+func (*addTypeToBoard) Name() string {
        return "add column `type` at boards"
 }
diff --git a/models/migrationscripts/20220903_encrypt_blueprint.go 
b/models/migrationscripts/20220903_encrypt_blueprint.go
index 0031526f..c5171bc1 100644
--- a/models/migrationscripts/20220903_encrypt_blueprint.go
+++ b/models/migrationscripts/20220903_encrypt_blueprint.go
@@ -18,113 +18,81 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "encoding/json"
-       "github.com/apache/incubator-devlake/config"
+
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
        "github.com/apache/incubator-devlake/plugins/core"
-       "gorm.io/gorm"
 )
 
-type Blueprint0903Temp struct {
-       Name   string `json:"name" validate:"required"`
-       Mode   string `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
-       Enable bool   `json:"enable"`
-       //please check this https://crontab.guru/ for detail
-       CronConfig     string `json:"cronConfig" format:"* * * * *" example:"0 
0 * * 1"`
-       IsManual       bool   `json:"isManual"`
-       archived.Model `swaggerignore:"true"`
-       Plan           string `json:"plan"`
-       Settings       string `json:"settings"`
-}
+var _ core.MigrationScript = (*encryptBlueprint)(nil)
 
-func (Blueprint0903Temp) TableName() string {
-       return "_devlake_blueprints_tmp"
-}
+type encryptBlueprint struct{}
 
-type BlueprintOldVersion struct {
-       Name   string          `json:"name" validate:"required"`
-       Mode   string          `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
-       Plan   json.RawMessage `json:"plan"`
-       Enable bool            `json:"enable"`
-       //please check this https://crontab.guru/ for detail
+type Blueprint20220903Before struct {
+       Name           string          `json:"name" validate:"required"`
+       Mode           string          `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
+       Plan           json.RawMessage `json:"plan"`
+       Enable         bool            `json:"enable"`
        CronConfig     string          `json:"cronConfig" format:"* * * * *" 
example:"0 0 * * 1"`
        IsManual       bool            `json:"isManual"`
        Settings       json.RawMessage `json:"settings" 
swaggertype:"array,string" example:"please check api: 
/blueprints/<PLUGIN_NAME>/blueprint-setting"`
        archived.Model `swaggerignore:"true"`
 }
 
-func (BlueprintOldVersion) TableName() string {
-       return "_devlake_blueprints"
+type Blueprint20220903After struct {
+       /* unchanged part */
+       Name           string `json:"name" validate:"required"`
+       Mode           string `json:"mode" gorm:"varchar(20)" 
validate:"required,oneof=NORMAL ADVANCED"`
+       Enable         bool   `json:"enable"`
+       CronConfig     string `json:"cronConfig" format:"* * * * *" example:"0 
0 * * 1"`
+       IsManual       bool   `json:"isManual"`
+       archived.Model `swaggerignore:"true"`
+       /* changed part */
+       Plan     string `json:"plan"`
+       Settings string `json:"settings"`
 }
 
-type encryptBLueprint struct{}
-
-func (*encryptBLueprint) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().CreateTable(&Blueprint0903Temp{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       //nolint:errcheck
-       defer db.Migrator().DropTable(&Blueprint0903Temp{})
-
-       var result *gorm.DB
-       var blueprintList []BlueprintOldVersion
-       result = db.Find(&blueprintList)
-
-       if result.Error != nil {
-               return errors.Convert(result.Error)
-       }
-
-       // Encrypt all blueprints.plan&settings which had been stored before 
v0.14
-       for _, v := range blueprintList {
-               c := config.GetConfig()
-               encKey := c.GetString(core.EncodeKeyEnvStr)
-               if encKey == "" {
-                       return errors.BadInput.New("invalid encKey")
-               }
-               encryptedPlan, err := core.Encrypt(encKey, string(v.Plan))
-               if err != nil {
-                       return err
-               }
-               encryptedSettings, err := core.Encrypt(encKey, 
string(v.Settings))
-               if err != nil {
-                       return err
-               }
-               newBlueprint := &Blueprint0903Temp{
-                       Name:       v.Name,
-                       Mode:       v.Mode,
-                       Enable:     v.Enable,
-                       CronConfig: v.CronConfig,
-                       IsManual:   v.IsManual,
-                       Model:      archived.Model{ID: v.ID},
-                       Plan:       encryptedPlan,
-                       Settings:   encryptedSettings,
-               }
-               err = errors.Convert(db.Create(newBlueprint).Error)
-               if err != nil {
-                       return err
-               }
-       }
-
-       err = db.Migrator().DropTable(&BlueprintOldVersion{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-
-       err = db.Migrator().RenameTable(Blueprint0903Temp{}, 
BlueprintOldVersion{})
-       if err != nil {
-               return errors.Convert(err)
+func (script *encryptBlueprint) Up(basicRes core.BasicRes) errors.Error {
+       encKey := basicRes.GetConfig(core.EncodeKeyEnvStr)
+       if encKey == "" {
+               return errors.BadInput.New("invalid encKey")
        }
 
-       return nil
+       return migrationhelper.TransformTable(
+               basicRes,
+               script,
+               "_devlake_blueprints",
+               func(s *Blueprint20220903Before) (*Blueprint20220903After, 
errors.Error) {
+                       encryptedPlan, err := core.Encrypt(encKey, 
string(s.Plan))
+                       if err != nil {
+                               return nil, err
+                       }
+                       encryptedSettings, err := core.Encrypt(encKey, 
string(s.Settings))
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       dst := &Blueprint20220903After{
+                               Name:       s.Name,
+                               Mode:       s.Mode,
+                               Enable:     s.Enable,
+                               CronConfig: s.CronConfig,
+                               IsManual:   s.IsManual,
+                               Model:      archived.Model{ID: s.ID},
+                               Plan:       encryptedPlan,
+                               Settings:   encryptedSettings,
+                       }
+                       return dst, nil
+               },
+       )
 }
 
-func (*encryptBLueprint) Version() uint64 {
+func (*encryptBlueprint) Version() uint64 {
        return 20220904142321
 }
 
-func (*encryptBLueprint) Name() string {
+func (*encryptBlueprint) Name() string {
        return "encrypt Blueprint"
 }
diff --git a/models/migrationscripts/20220904_encrypt_pipeline.go 
b/models/migrationscripts/20220904_encrypt_pipeline.go
index b2e9e26e..d90ac70b 100644
--- a/models/migrationscripts/20220904_encrypt_pipeline.go
+++ b/models/migrationscripts/20220904_encrypt_pipeline.go
@@ -18,42 +18,25 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "time"
 
-       "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
        "github.com/apache/incubator-devlake/models/common"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
        "github.com/apache/incubator-devlake/plugins/core"
        "gorm.io/datatypes"
-       "gorm.io/gorm"
 )
 
-type Pipeline0904Temp struct {
-       common.Model
-       Name          string     `json:"name" gorm:"index"`
-       BlueprintId   uint64     `json:"blueprintId"`
-       Plan          string     `json:"plan" encrypt:"yes"`
-       TotalTasks    int        `json:"totalTasks"`
-       FinishedTasks int        `json:"finishedTasks"`
-       BeganAt       *time.Time `json:"beganAt"`
-       FinishedAt    *time.Time `json:"finishedAt" gorm:"index"`
-       Status        string     `json:"status"`
-       Message       string     `json:"message"`
-       SpentSeconds  int        `json:"spentSeconds"`
-       Stage         int        `json:"stage"`
-}
+var _ core.MigrationScript = (*encryptPipeline)(nil)
 
-func (Pipeline0904Temp) TableName() string {
-       return "_devlake_pipeline_0904_tmp"
-}
+type encryptPipeline struct{}
 
-type PipelineOldVersion struct {
+type Pipeline20220904Before struct {
        archived.Model
        Name          string         `json:"name" gorm:"index"`
        BlueprintId   uint64         `json:"blueprintId"`
-       Plan          datatypes.JSON `json:"plan"`
+       Plan          datatypes.JSON `json:"plan"` // target field
        TotalTasks    int            `json:"totalTasks"`
        FinishedTasks int            `json:"finishedTasks"`
        BeganAt       *time.Time     `json:"beganAt"`
@@ -64,68 +47,57 @@ type PipelineOldVersion struct {
        Stage         int            `json:"stage"`
 }
 
-func (PipelineOldVersion) TableName() string {
-       return "_devlake_pipelines"
+type Pipeline0904After struct {
+       common.Model
+       Name          string     `json:"name" gorm:"index"`
+       BlueprintId   uint64     `json:"blueprintId"`
+       Plan          string     `json:"plan" encrypt:"yes"` // target field
+       TotalTasks    int        `json:"totalTasks"`
+       FinishedTasks int        `json:"finishedTasks"`
+       BeganAt       *time.Time `json:"beganAt"`
+       FinishedAt    *time.Time `json:"finishedAt" gorm:"index"`
+       Status        string     `json:"status"`
+       Message       string     `json:"message"`
+       SpentSeconds  int        `json:"spentSeconds"`
+       Stage         int        `json:"stage"`
 }
 
-type encryptPipeline struct{}
-
-func (*encryptPipeline) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().CreateTable(&Pipeline0904Temp{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       //nolint:errcheck
-       defer db.Migrator().DropTable(&Pipeline0904Temp{})
-
-       var result *gorm.DB
-       var pipelineList []PipelineOldVersion
-       result = db.Find(&pipelineList)
-
-       if result.Error != nil {
-               return errors.Convert(result.Error)
-       }
-
-       // Encrypt all pipelines.plan which had been stored before v0.14
-       for _, v := range pipelineList {
-               c := config.GetConfig()
-               encKey := c.GetString(core.EncodeKeyEnvStr)
-               if encKey == "" {
-                       return errors.BadInput.New("invalid encKey")
-               }
-               encryptedPlan, err := core.Encrypt(encKey, string(v.Plan))
-               if err != nil {
-                       return err
-               }
-               newPipeline := &Pipeline0904Temp{
-                       Name:          v.Name,
-                       BlueprintId:   v.BlueprintId,
-                       FinishedTasks: v.FinishedTasks,
-                       BeganAt:       v.BeganAt,
-                       FinishedAt:    v.FinishedAt,
-                       Status:        v.Status,
-                       Message:       v.Message,
-                       SpentSeconds:  v.SpentSeconds,
-                       Stage:         v.Stage,
-                       Plan:          encryptedPlan,
-               }
-               err = errors.Convert(db.Create(newPipeline).Error)
-               if err != nil {
-                       return err
-               }
-       }
+func (Pipeline20220904Before) TableName() string {
+       return "_devlake_pipelines"
+}
 
-       err = db.Migrator().DropTable(&PipelineOldVersion{})
-       if err != nil {
-               return errors.Convert(err)
+func (script *encryptPipeline) Up(basicRes core.BasicRes) errors.Error {
+       encKey := basicRes.GetConfig(core.EncodeKeyEnvStr)
+       if encKey == "" {
+               return errors.BadInput.New("invalid encKey")
        }
 
-       err = db.Migrator().RenameTable(Pipeline0904Temp{}, 
PipelineOldVersion{})
-       if err != nil {
-               return errors.Convert(err)
-       }
+       return migrationhelper.TransformTable(
+               basicRes,
+               script,
+               "_devlake_pipelines",
+               func(s *Pipeline20220904Before) (*Pipeline0904After, 
errors.Error) {
+                       encryptedPlan, err := core.Encrypt(encKey, 
string(s.Plan))
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       dst := &Pipeline0904After{
+                               Name:          s.Name,
+                               BlueprintId:   s.BlueprintId,
+                               FinishedTasks: s.FinishedTasks,
+                               BeganAt:       s.BeganAt,
+                               FinishedAt:    s.FinishedAt,
+                               Status:        s.Status,
+                               Message:       s.Message,
+                               SpentSeconds:  s.SpentSeconds,
+                               Stage:         s.Stage,
+                               Plan:          encryptedPlan,
+                       }
+                       return dst, nil
+               },
+       )
 
-       return nil
 }
 
 func (*encryptPipeline) Version() uint64 {
diff --git a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go 
b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
index dc9e0de1..75ea909d 100644
--- a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
+++ b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
@@ -18,71 +18,47 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
 
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
 )
 
-type modifyPipeline struct{}
+var _ core.MigrationScript = (*modifyCicdPipeline)(nil)
 
-func (*modifyPipeline) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().DropColumn(CICDPipeline0905{}, "commit_sha")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().DropColumn(CICDPipeline0905{}, "branch")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().DropColumn(CICDPipeline0905{}, "repo")
+type modifyCicdPipeline struct{}
+
+type CICDPipelineRelationship20220905 struct {
+       ParentPipelineId string `gorm:"primaryKey;type:varchar(255)"`
+       ChildPipelineId  string `gorm:"primaryKey;type:varchar(255)"`
+       archived.NoPKModel
+}
+
+func (CICDPipelineRelationship20220905) TableName() string {
+       return "cicd_pipeline_relationships"
+}
+
+func (*modifyCicdPipeline) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.DropColumns("cicd_pipelines", "commit_sha", "branch", "repo")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().RenameColumn(CICDPipelineRepo0905{}, "repo_url", 
"repo")
+       err = db.RenameColumn("cicd_pipeline_repos", "repo_url", "repo")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().AutoMigrate(CICDPipelineRelationship0905{})
+       err = db.AutoMigrate(&CICDPipelineRelationship20220905{})
        if err != nil {
                return errors.Convert(err)
        }
        return nil
 }
 
-func (*modifyPipeline) Version() uint64 {
+func (*modifyCicdPipeline) Version() uint64 {
        return 20220905232735
 }
 
-func (*modifyPipeline) Name() string {
+func (*modifyCicdPipeline) Name() string {
        return "modify cicd pipeline"
 }
-
-type CICDPipeline0905 struct {
-       CommitSha string `gorm:"type:varchar(255);index"`
-       Branch    string `gorm:"type:varchar(255);index"`
-       Repo      string `gorm:"type:varchar(255);index"`
-}
-
-func (CICDPipeline0905) TableName() string {
-       return "cicd_pipelines"
-}
-
-type CICDPipelineRepo0905 struct {
-       RepoUrl string `gorm:"type:varchar(255)"`
-}
-
-func (CICDPipelineRepo0905) TableName() string {
-       return "cicd_pipeline_repos"
-}
-
-type CICDPipelineRelationship0905 struct {
-       ParentPipelineId string `gorm:"primaryKey;type:varchar(255)"`
-       ChildPipelineId  string `gorm:"primaryKey;type:varchar(255)"`
-       archived.NoPKModel
-}
-
-func (CICDPipelineRelationship0905) TableName() string {
-       return "cicd_pipeline_relationships"
-}
diff --git a/models/migrationscripts/20220908_modfiy_cicd_tasks.go 
b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
index f30060a3..fa669536 100644
--- a/models/migrationscripts/20220908_modfiy_cicd_tasks.go
+++ b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
@@ -18,34 +18,30 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
-
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*modifyCICDTasks)(nil)
+
 type modifyCICDTasks struct{}
 
-func (*modifyCICDTasks) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(CICDTask0905{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
+type CICDTask0905 struct {
+       Environment string `gorm:"type:varchar(255)"`
 }
 
-func (*modifyCICDTasks) Version() uint64 {
-       return 20220909232735
+func (CICDTask0905) TableName() string {
+       return "cicd_tasks"
 }
 
-func (*modifyCICDTasks) Name() string {
-       return "modify cicd tasks"
+func (*modifyCICDTasks) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&CICDTask0905{})
 }
 
-type CICDTask0905 struct {
-       Environment string `gorm:"type:varchar(255)"`
+func (*modifyCICDTasks) Version() uint64 {
+       return 20220909232735
 }
 
-func (CICDTask0905) TableName() string {
-       return "cicd_tasks"
+func (*modifyCICDTasks) Name() string {
+       return "modify cicd tasks"
 }
diff --git a/models/migrationscripts/20220913_add_origin_value_for_pr.go 
b/models/migrationscripts/20220913_add_origin_value_for_pr.go
index 80959310..b26c61d8 100644
--- a/models/migrationscripts/20220913_add_origin_value_for_pr.go
+++ b/models/migrationscripts/20220913_add_origin_value_for_pr.go
@@ -18,45 +18,13 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-
        "github.com/apache/incubator-devlake/errors"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type addOriginChangeValueForPr struct{}
-
-func (*addOriginChangeValueForPr) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
-       err := db.Migrator().AddColumn(PullRequest0913{}, 
"orig_coding_timespan")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().AddColumn(PullRequest0913{}, "orig_review_lag")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().AddColumn(PullRequest0913{}, "orig_review_timespan")
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().AddColumn(PullRequest0913{}, "orig_deploy_timespan")
-       if err != nil {
-               return errors.Convert(err)
-       }
-
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
-}
-
-func (*addOriginChangeValueForPr) Version() uint64 {
-       return 20220913235535
-}
+var _ core.MigrationScript = (*addOriginChangeValueForPr)(nil)
 
-func (*addOriginChangeValueForPr) Name() string {
-       return "add origin change lead time for pr"
-}
+type addOriginChangeValueForPr struct{}
 
 type PullRequest0913 struct {
        OrigCodingTimespan int64
@@ -68,3 +36,15 @@ type PullRequest0913 struct {
 func (PullRequest0913) TableName() string {
        return "pull_requests"
 }
+
+func (*addOriginChangeValueForPr) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&PullRequest0913{})
+}
+
+func (*addOriginChangeValueForPr) Version() uint64 {
+       return 20220913235535
+}
+
+func (*addOriginChangeValueForPr) Name() string {
+       return "add origin change lead time for pr"
+}
diff --git a/models/migrationscripts/20220913_modfiy_board_repos.go 
b/models/migrationscripts/20220913_add_raw_data_origin_to_board_repos.go
similarity index 64%
rename from models/migrationscripts/20220913_modfiy_board_repos.go
rename to models/migrationscripts/20220913_add_raw_data_origin_to_board_repos.go
index fd432693..4bf2074b 100644
--- a/models/migrationscripts/20220913_modfiy_board_repos.go
+++ b/models/migrationscripts/20220913_add_raw_data_origin_to_board_repos.go
@@ -18,35 +18,32 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type modifyBoardRepos struct{}
+var _ core.MigrationScript = (*addRawDataOriginToBoardRepos)(nil)
+
+// addRawDataOriginToBoardRepos add raw data fields to board_repos
+type addRawDataOriginToBoardRepos struct{}
 
-func (*modifyBoardRepos) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(BoardRepo0913{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
+type boardRepo20220913 struct {
+       archived.NoPKModel
 }
 
-func (*modifyBoardRepos) Version() uint64 {
-       return 20220913232735
+func (boardRepo20220913) TableName() string {
+       return "board_repos"
 }
 
-func (*modifyBoardRepos) Name() string {
-       return "modify board repos"
+func (*addRawDataOriginToBoardRepos) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&boardRepo20220913{})
 }
 
-type BoardRepo0913 struct {
-       archived.NoPKModel
+func (*addRawDataOriginToBoardRepos) Version() uint64 {
+       return 20220913232735
 }
 
-func (BoardRepo0913) TableName() string {
-       return "board_repos"
+func (*addRawDataOriginToBoardRepos) Name() string {
+       return "modify board repos"
 }
diff --git a/models/migrationscripts/20220913_commitfile_add_length.go 
b/models/migrationscripts/20220913_commitfile_add_length.go
deleted file mode 100644
index a65937d4..00000000
--- a/models/migrationscripts/20220913_commitfile_add_length.go
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package migrationscripts
-
-import (
-       "context"
-       "crypto/sha256"
-       "encoding/hex"
-       "fmt"
-       "reflect"
-       "strings"
-
-       "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "github.com/apache/incubator-devlake/plugins/gitlab/api"
-       "github.com/apache/incubator-devlake/plugins/helper"
-       "gorm.io/gorm"
-)
-
-type CommitFileAddLength struct {
-       archived.DomainEntity
-       CommitSha string `gorm:"type:varchar(40)"`
-       FilePath  string `gorm:"type:text"`
-       Additions int
-       Deletions int
-}
-
-func (CommitFileAddLength) TableName() string {
-       return "commit_files"
-}
-
-type CommitFileAddLengthBak struct {
-       archived.DomainEntity
-       CommitSha string `gorm:"type:varchar(40)"`
-       FilePath  string `gorm:"type:varchar(255)"`
-       Additions int
-       Deletions int
-}
-
-func (CommitFileAddLengthBak) TableName() string {
-       return "commit_files_bak"
-}
-
-type CommitFileComponentBak struct {
-       archived.NoPKModel
-       CommitFileId  string `gorm:"primaryKey;type:varchar(255)"`
-       ComponentName string `gorm:"type:varchar(255)"`
-}
-
-func (CommitFileComponentBak) TableName() string {
-       return "commit_file_components_bak"
-}
-
-type addCommitFilePathLength struct{}
-
-func (*addCommitFilePathLength) Up(ctx context.Context, db *gorm.DB) (errs 
errors.Error) {
-       var err error
-
-       // rename the commit_file_bak to cache old table
-       err = db.Migrator().RenameTable(&CommitFile{}, 
&CommitFileAddLengthBak{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error no rename commit_file to 
commit_files_bak")
-       }
-
-       // rollback for rename back
-       defer func() {
-               if errs != nil {
-                       err = 
db.Migrator().RenameTable(&CommitFileAddLengthBak{}, &CommitFile{})
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table commit_file_bak , you must to rollback by 
yourself. %s", err.Error()))
-                       }
-               }
-       }()
-
-       // create new commit_files table
-       err = db.Migrator().AutoMigrate(&CommitFileAddLength{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error on auto migrate 
commit_file")
-       }
-
-       // rollback for create new table
-       defer func() {
-               if errs != nil {
-                       err = db.Migrator().DropTable(&CommitFile{})
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table CommitFile , you must to rollback by 
yourself. %s", err.Error()))
-                       }
-               }
-       }()
-
-       // update old id to new id and write to the new table
-       cursor, err := db.Model(&CommitFileAddLengthBak{}).Rows()
-       if err != nil {
-               return errors.Default.Wrap(err, "error on select 
CommitFileAddLength")
-       }
-       defer cursor.Close()
-
-       // caculate and save the data to new table
-       batch, err := helper.NewBatchSave(api.BasicRes, 
reflect.TypeOf(&CommitFileAddLength{}), 200)
-       if err != nil {
-               return errors.Default.Wrap(err, "error getting batch from table 
commit_file")
-       }
-
-       defer batch.Close()
-       for cursor.Next() {
-               cfb := CommitFileAddLengthBak{}
-               err = db.ScanRows(cursor, &cfb)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error scan rows from 
table commit_files_bak")
-               }
-
-               cf := CommitFileAddLength(cfb)
-
-               // With some long path,the varchar(255) was not enough both ID 
and file_path
-               // So we use the hash to compress the path in ID and add length 
of file_path.
-               shaFilePath := sha256.New()
-               shaFilePath.Write([]byte(cf.FilePath))
-               cf.Id = cf.CommitSha + ":" + 
hex.EncodeToString(shaFilePath.Sum(nil))
-
-               err = batch.Add(&cf)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error on commit_files 
batch add")
-               }
-       }
-
-       // rename the commit_file_components_bak
-       err = db.Migrator().RenameTable(&CommitFileComponent{}, 
&CommitFileComponentBak{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error no rename 
commit_file_components to commit_file_components_bak")
-       }
-
-       // rollback for rename back
-       defer func() {
-               if errs != nil {
-                       err = 
db.Migrator().RenameTable(&CommitFileComponentBak{}, &CommitFileComponent{})
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table commit_file_components_bak , you must to 
rollback by yourself. %s", err.Error()))
-                       }
-               }
-       }()
-
-       // create new commit_file_components table
-       err = db.Migrator().AutoMigrate(&CommitFileComponent{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error on auto migrate 
commit_file")
-       }
-
-       // rollback for create new table
-       defer func() {
-               if errs != nil {
-                       err = db.Migrator().DropTable(&CommitFileComponent{})
-                       if err != nil {
-                               errs = errors.Default.Wrap(err, 
fmt.Sprintf("fail to rollback table commit_file_components , you must to 
rollback by yourself. %s", err.Error()))
-                       }
-               }
-       }()
-
-       // update old id to new id and write to the new table
-       cursor2, err := db.Model(&CommitFileComponentBak{}).Rows()
-       if err != nil {
-               return errors.Default.Wrap(err, "error on select 
commit_file_components_bak")
-       }
-       defer cursor2.Close()
-
-       // caculate and save the data to new table
-       batch2, err := helper.NewBatchSave(api.BasicRes, 
reflect.TypeOf(&CommitFileComponent{}), 500)
-       if err != nil {
-               return errors.Default.Wrap(err, "error getting batch from table 
commit_file_components")
-       }
-       defer batch2.Close()
-
-       for cursor2.Next() {
-               cfcb := CommitFileComponentBak{}
-               err = db.ScanRows(cursor2, &cfcb)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error scan rows from 
table commit_file_components_bak")
-               }
-
-               cfc := CommitFileComponent(cfcb)
-
-               ids := strings.Split(cfc.CommitFileId, ":")
-
-               commitSha := ""
-               filePath := ""
-
-               if len(ids) > 0 {
-                       commitSha = ids[0]
-                       if len(ids) > 1 {
-                               for i := 1; i < len(ids); i++ {
-                                       if i > 1 {
-                                               filePath += ":"
-                                       }
-                                       filePath += ids[i]
-                               }
-                       }
-               }
-
-               // With some long path,the varchar(255) was not enough both ID 
and file_path
-               // So we use the hash to compress the path in ID and add length 
of file_path.
-               shaFilePath := sha256.New()
-               shaFilePath.Write([]byte(filePath))
-               cfc.CommitFileId = commitSha + ":" + 
hex.EncodeToString(shaFilePath.Sum(nil))
-
-               err = batch2.Add(&cfc)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error on 
commit_file_components batch add")
-               }
-       }
-
-       // drop the old table
-       err = db.Migrator().DropTable(&CommitFileAddLengthBak{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error no drop 
commit_files_bak")
-       }
-       err = db.Migrator().DropTable(&CommitFileComponentBak{})
-       if err != nil {
-               return errors.Default.Wrap(err, "error no drop 
commit_file_components_bak")
-       }
-
-       return nil
-}
-
-func (*addCommitFilePathLength) Version() uint64 {
-       return 20220913165805
-}
-
-func (*addCommitFilePathLength) Name() string {
-       return "add length of commit_file file_path"
-}
diff --git a/models/migrationscripts/20220913_fix_commitfile_id_toolong.go 
b/models/migrationscripts/20220913_fix_commitfile_id_toolong.go
new file mode 100644
index 00000000..70470870
--- /dev/null
+++ b/models/migrationscripts/20220913_fix_commitfile_id_toolong.go
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "crypto/sha256"
+       "encoding/hex"
+       "strings"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       "github.com/apache/incubator-devlake/models/migrationscripts/archived"
+       "github.com/apache/incubator-devlake/plugins/core"
+)
+
+var _ core.MigrationScript = (*fixCommitFileIdTooLong)(nil)
+
+type fixCommitFileIdTooLong struct{}
+
+type commitFile20220913Before struct {
+       archived.DomainEntity
+       CommitSha string `gorm:"type:varchar(40)"`
+       FilePath  string `gorm:"type:varchar(255)"` // target field
+}
+
+type commitFile20220913After struct {
+       archived.DomainEntity
+       CommitSha string `gorm:"type:varchar(40)"`
+       FilePath  string `gorm:"type:text"` // target field
+}
+
+type commitFileComponent20220913 struct {
+       archived.NoPKModel
+       CommitFileId  string `gorm:"primaryKey;type:varchar(255)"`
+       ComponentName string `gorm:"type:varchar(255)"`
+}
+
+func (script *fixCommitFileIdTooLong) Up(basicRes core.BasicRes) errors.Error {
+       // migrate main table
+       err := migrationhelper.TransformTable(
+               basicRes,
+               script,
+               "commit_files",
+               func(s *commitFile20220913Before) (*commitFile20220913After, 
errors.Error) {
+                       // copy data
+                       dst := commitFile20220913After(*s)
+                       // generate new id with hashed file path to avoid 
length problem
+                       shaFilePath := sha256.New()
+                       shaFilePath.Write([]byte(dst.FilePath))
+                       dst.Id = dst.CommitSha + ":" + 
hex.EncodeToString(shaFilePath.Sum(nil))
+                       return &dst, nil
+               },
+       )
+       if err != nil {
+               return err
+       }
+       // migrate related table
+       return migrationhelper.TransformTable(
+               basicRes,
+               script,
+               "commit_files",
+               func(s *commitFileComponent20220913) 
(*commitFileComponent20220913, errors.Error) {
+                       // copy data
+                       dst := commitFileComponent20220913(*s)
+                       // generate new id with hashed file path to avoid 
length problem
+                       ids := strings.Split(dst.CommitFileId, ":")
+
+                       commitSha := ""
+                       filePath := ""
+
+                       if len(ids) > 0 {
+                               commitSha = ids[0]
+                               if len(ids) > 1 {
+                                       for i := 1; i < len(ids); i++ {
+                                               if i > 1 {
+                                                       filePath += ":"
+                                               }
+                                               filePath += ids[i]
+                                       }
+                               }
+                       }
+
+                       // With some long path,the varchar(255) was not enough 
both ID and file_path
+                       // So we use the hash to compress the path in ID and 
add length of file_path.
+                       shaFilePath := sha256.New()
+                       shaFilePath.Write([]byte(filePath))
+                       dst.CommitFileId = commitSha + ":" + 
hex.EncodeToString(shaFilePath.Sum(nil))
+                       return &dst, nil
+               },
+       )
+}
+
+func (*fixCommitFileIdTooLong) Version() uint64 {
+       return 20220913165805
+}
+
+func (*fixCommitFileIdTooLong) Name() string {
+       return "add length of commit_file file_path"
+}
diff --git a/models/migrationscripts/20220915_rename_pipeline_commits.go 
b/models/migrationscripts/20220915_rename_pipeline_commits.go
index d4a7a21b..69da85b7 100644
--- a/models/migrationscripts/20220915_rename_pipeline_commits.go
+++ b/models/migrationscripts/20220915_rename_pipeline_commits.go
@@ -18,26 +18,27 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/migrationscripts/archived"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var _ core.MigrationScript = (*renamePipelineCommits)(nil)
+
 type renamePipelineCommits struct{}
 
-type CiCDPipelineRepoOld struct {
+type CiCDPipelineRepo20220915Before struct {
        archived.DomainEntity
        CommitSha string `gorm:"primaryKey;type:varchar(255)"`
        Branch    string `gorm:"type:varchar(255)"`
        Repo      string `gorm:"type:varchar(255)"`
 }
 
-func (CiCDPipelineRepoOld) TableName() string {
+func (CiCDPipelineRepo20220915Before) TableName() string {
        return "cicd_pipeline_repos"
 }
 
-type CiCDPipelineRepo0915 struct {
+type CiCDPipelineRepo20220915After struct {
        archived.NoPKModel
        PipelineId string `gorm:"primaryKey;type:varchar(255)"`
        CommitSha  string `gorm:"primaryKey;type:varchar(255)"`
@@ -46,27 +47,27 @@ type CiCDPipelineRepo0915 struct {
        RepoUrl    string
 }
 
-func (CiCDPipelineRepo0915) TableName() string {
+func (CiCDPipelineRepo20220915After) TableName() string {
        return "cicd_pipeline_commits"
 }
 
-func (*renamePipelineCommits) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
-       err := db.Migrator().RenameTable(CiCDPipelineRepoOld{}, 
CiCDPipelineRepo0915{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       err = db.Migrator().RenameColumn(CiCDPipelineRepo0915{}, `id`, 
`pipeline_id`)
+func (*renamePipelineCommits) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       err := db.RenameTable("cicd_pipeline_repos", "cicd_pipeline_commits")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().DropIndex(CiCDPipelineRepo0915{}, 
`idx_cicd_pipeline_repos_raw_data_params`)
+       // err = db.DropIndexes("cicd_pipeline_repos", 
`idx_cicd_pipeline_repos_raw_data_params`)
+       // if err != nil {
+       //      return err
+       // }
+       err = db.RenameColumn("cicd_pipeline_commits", "id", "pipeline_id")
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       // add index
-       err = db.Migrator().AutoMigrate(CiCDPipelineRepo0915{})
+       err = db.AutoMigrate(CiCDPipelineRepo20220915After{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        return nil
 }
diff --git a/models/migrationscripts/20220918_commit_line_change.go 
b/models/migrationscripts/20220918_commit_line_change.go
index 031a0141..8e47c239 100644
--- a/models/migrationscripts/20220918_commit_line_change.go
+++ b/models/migrationscripts/20220918_commit_line_change.go
@@ -18,13 +18,16 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/domainlayer"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type CommitLineChange struct {
+var _ core.MigrationScript = (*commitLineChange)(nil)
+
+type commitLineChange struct{}
+
+type commitLineChange20220918 struct {
        domainlayer.DomainEntity
        Id          string `gorm:"type:varchar(255);primaryKey"`
        CommitSha   string `gorm:"type:varchar(40);"`
@@ -37,18 +40,12 @@ type CommitLineChange struct {
        PrevCommit  string `gorm:"type:varchar(255)"`
 }
 
-func (CommitLineChange) TableName() string {
+func (commitLineChange20220918) TableName() string {
        return "commit_line_change"
 }
 
-type commitLineChange struct{}
-
-func (*commitLineChange) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(CommitLineChange{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
+func (*commitLineChange) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(&commitLineChange20220918{})
 
 }
 
diff --git a/models/migrationscripts/20220927_add_snapshot.go 
b/models/migrationscripts/20220927_add_snapshot.go
index 42edb81a..a3192c62 100644
--- a/models/migrationscripts/20220927_add_snapshot.go
+++ b/models/migrationscripts/20220927_add_snapshot.go
@@ -18,13 +18,14 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/common"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
 )
 
-type RepoSnapshot struct {
+var _ core.MigrationScript = (*addRepoSnapshot)(nil)
+
+type repoSnapshot20220918 struct {
        common.NoPKModel
        RepoId    string `gorm:"primaryKey;type:varchar(255)"`
        CommitSha string `gorm:"primaryKey;type:varchar(40);"`
@@ -32,18 +33,14 @@ type RepoSnapshot struct {
        LineNo    int    `gorm:"primaryKey;type:int;"`
 }
 
-func (RepoSnapshot) TableName() string {
+func (repoSnapshot20220918) TableName() string {
        return "repo_snapshot"
 }
 
 type addRepoSnapshot struct{}
 
-func (*addRepoSnapshot) Up(ctx context.Context, db *gorm.DB) errors.Error {
-       err := db.Migrator().AutoMigrate(RepoSnapshot{})
-       if err != nil {
-               return errors.Convert(err)
-       }
-       return nil
+func (*addRepoSnapshot) Up(basicRes core.BasicRes) errors.Error {
+       return basicRes.GetDal().AutoMigrate(repoSnapshot20220918{})
 }
 
 func (*addRepoSnapshot) Version() uint64 {
diff --git a/models/migrationscripts/20220929_modify_lead_time_minutes.go 
b/models/migrationscripts/20220929_modify_lead_time_minutes.go
index 28f8035a..81bd65ce 100644
--- a/models/migrationscripts/20220929_modify_lead_time_minutes.go
+++ b/models/migrationscripts/20220929_modify_lead_time_minutes.go
@@ -18,40 +18,57 @@ limitations under the License.
 package migrationscripts
 
 import (
-       "context"
-
        "github.com/apache/incubator-devlake/errors"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
+var _ core.MigrationScript = (*modifyLeadTimeMinutes)(nil)
+
 type modifyLeadTimeMinutes struct{}
 
-type newIssue struct {
+type Issues20220929 struct {
        LeadTimeMinutes int64
 }
 
-func (newIssue) TableName() string {
+func (Issues20220929) TableName() string {
        return "issues"
 }
 
-func (*modifyLeadTimeMinutes) Up(ctx context.Context, db *gorm.DB) 
errors.Error {
-       err := db.Migrator().RenameColumn(&newIssue{}, "lead_time_minutes", 
"lead_time_minutes_bak")
+func (*modifyLeadTimeMinutes) Up(basicRes core.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       bakColumnName := "lead_time_minutes_20220929"
+       err := db.RenameColumn("issues", "lead_time_minutes", bakColumnName)
+       defer func() {
+               if err != nil {
+                       _ = db.RenameColumn("issues", bakColumnName, 
"lead_time_minutes")
+               }
+       }()
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().AddColumn(newIssue{}, "lead_time_minutes")
+       err = db.AutoMigrate(&Issues20220929{})
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Model(&newIssue{}).Where("lead_time_minutes_bak > 
0").UpdateColumn("lead_time_minutes", gorm.Expr("lead_time_minutes_bak")).Error
+       defer func() {
+               if err != nil {
+                       _ = db.DropColumns("issues", "lead_time_minutes")
+               }
+       }()
+       err = db.UpdateColumn(
+               &Issues20220929{},
+               "lead_time_minutes",
+               dal.DalClause{Expr: bakColumnName},
+               dal.Where("lead_time_minutes != 0"),
+       )
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       err = db.Migrator().DropColumn(&newIssue{}, "lead_time_minutes_bak")
+       err = db.DropColumns("issues", bakColumnName)
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-       db.First(&newIssue{})
        return nil
 }
 
diff --git a/models/migrationscripts/register.go 
b/models/migrationscripts/register.go
index 31867c7d..a862dc24 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -17,34 +17,36 @@ limitations under the License.
 
 package migrationscripts
 
-import "github.com/apache/incubator-devlake/migration"
+import (
+       "github.com/apache/incubator-devlake/plugins/core"
+)
 
 // All return all the migration scripts of framework
-func All() []migration.Script {
-       return []migration.Script{
-               new(addFrameTables),
-               new(renameStepToStage),
-               new(addSubtasksField),
-               new(updateBlueprintMode),
+func All() []core.MigrationScript {
+       return []core.MigrationScript{
+               new(addFrameworkTables),
+               new(renamePipelineStepToStage),
+               new(addSubtaskToTaskTable),
+               new(addBlueprintMode),
                new(renameTasksToPlan),
-               new(addDomainTables),
-               new(commitfileComponent),
+               new(resetDomainTables),
+               new(addCommitFileComponent),
                new(removeNotes),
                new(addProjectMapping),
                new(renameColumnsOfPullRequestIssue),
                new(addNoPKModelToCommitParent),
                new(addSubtasksTable),
-               new(addCICD),
+               new(addCICDTables),
                new(renameColumnsOfPrCommentIssueComment),
                new(modifyTablesForDora),
-               new(addTypeFieldInBoard),
-               new(modifyPipeline),
-               new(encryptBLueprint),
+               new(addTypeToBoard),
+               new(encryptBlueprint),
                new(encryptPipeline),
+               new(modifyCicdPipeline),
                new(modifyCICDTasks),
-               new(modifyBoardRepos),
                new(addOriginChangeValueForPr),
-               new(addCommitFilePathLength),
+               new(fixCommitFileIdTooLong),
+               new(addRawDataOriginToBoardRepos),
                new(renamePipelineCommits),
                new(commitLineChange),
                new(modifyLeadTimeMinutes),
diff --git a/plugins/core/plugin_db_migration.go b/plugins/core/basic_res.go
similarity index 74%
copy from plugins/core/plugin_db_migration.go
copy to plugins/core/basic_res.go
index 005aca3d..f9ff6085 100644
--- a/plugins/core/plugin_db_migration.go
+++ b/plugins/core/basic_res.go
@@ -17,8 +17,13 @@ limitations under the License.
 
 package core
 
-import "github.com/apache/incubator-devlake/migration"
+import (
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+)
 
-type Migratable interface {
-       MigrationScripts() []migration.Script
+// BasicRes defines a set of fundamental resources that needed pretty much 
everywhere in our system
+type BasicRes interface {
+       GetConfig(name string) string
+       GetLogger() Logger
+       GetDal() dal.Dal
 }
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 01bf9fa6..a4860a55 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -22,10 +22,13 @@ import (
        "reflect"
 
        "github.com/apache/incubator-devlake/errors"
-
-       "gorm.io/gorm/schema"
 )
 
+type Tabler interface {
+       TableName() string
+}
+
+// Clause represents SQL Clause
 type Clause struct {
        Type string
        Data interface{}
@@ -53,8 +56,8 @@ type Dal interface {
        AutoMigrate(entity interface{}, clauses ...Clause) errors.Error
        // AddColumn add column for the table
        AddColumn(table, columnName, columnType string) errors.Error
-       // DropColumn drop column from the table
-       DropColumn(table, columnName string) errors.Error
+       // DropColumns drop column from the table
+       DropColumns(table string, columnName ...string) errors.Error
        // Exec executes raw sql query
        Exec(query string, params ...interface{}) errors.Error
        // RawCursor executes raw sql query and returns a database cursor
@@ -75,7 +78,9 @@ type Dal interface {
        Create(entity interface{}, clauses ...Clause) errors.Error
        // Update updates record
        Update(entity interface{}, clauses ...Clause) errors.Error
-       // UpdateColumns batch records in database
+       // UpdateColumn allows you to update mulitple records
+       UpdateColumn(entity interface{}, columnName string, value interface{}, 
clauses ...Clause) errors.Error
+       // UpdateColumn allows you to update multiple columns of mulitple 
records
        UpdateColumns(entity interface{}, clauses ...Clause) errors.Error
        // CreateOrUpdate tries to create the record, or fallback to update all 
if failed
        CreateOrUpdate(entity interface{}, clauses ...Clause) errors.Error
@@ -85,18 +90,22 @@ type Dal interface {
        Delete(entity interface{}, clauses ...Clause) errors.Error
        // AllTables returns all tables in database
        AllTables() ([]string, errors.Error)
+       // DropTables drops all specified tables
+       DropTables(dst ...interface{}) errors.Error
+       // RenameTable renames table name
+       RenameTable(oldName, newName string) errors.Error
        // GetColumns returns table columns in database
-       GetColumns(dst schema.Tabler, filter func(columnMeta ColumnMeta) bool) 
(cms []ColumnMeta, err errors.Error)
+       GetColumns(dst Tabler, filter func(columnMeta ColumnMeta) bool) (cms 
[]ColumnMeta, err errors.Error)
        // GetPrimarykeyFields get the PrimaryKey from `gorm` tag
        GetPrimaryKeyFields(t reflect.Type) []reflect.StructField
-       // RenameTable rename the oldName table to newName
-       RenameTable(oldName interface{}, newName interface{}) errors.Error
-       // DropTable drop the table
-       DropTable(dst ...interface{}) errors.Error
+       // RenameColumn renames column name for specified table
+       RenameColumn(table, oldColumnName, newColumnName string) errors.Error
+       // DropIndexes drops all specified tables
+       DropIndexes(table string, indexes ...string) errors.Error
 }
 
 // GetColumnNames returns table Column Names in database
-func GetColumnNames(d Dal, dst schema.Tabler, filter func(columnMeta 
ColumnMeta) bool) (names []string, err errors.Error) {
+func GetColumnNames(d Dal, dst Tabler, filter func(columnMeta ColumnMeta) 
bool) (names []string, err errors.Error) {
        columns, err := d.GetColumns(dst, filter)
        if err != nil {
                return
@@ -108,7 +117,7 @@ func GetColumnNames(d Dal, dst schema.Tabler, filter 
func(columnMeta ColumnMeta)
 }
 
 // GetPrimarykeyColumns get returns PrimaryKey table Meta in database
-func GetPrimarykeyColumns(d Dal, dst schema.Tabler) ([]ColumnMeta, 
errors.Error) {
+func GetPrimarykeyColumns(d Dal, dst Tabler) ([]ColumnMeta, errors.Error) {
        return d.GetColumns(dst, func(columnMeta ColumnMeta) bool {
                isPrimaryKey, ok := columnMeta.PrimaryKey()
                return isPrimaryKey && ok
@@ -116,7 +125,7 @@ func GetPrimarykeyColumns(d Dal, dst schema.Tabler) 
([]ColumnMeta, errors.Error)
 }
 
 // GetPrimarykeyColumnNames get returns PrimaryKey Column Names in database
-func GetPrimarykeyColumnNames(d Dal, dst schema.Tabler) (names []string, err 
errors.Error) {
+func GetPrimarykeyColumnNames(d Dal, dst Tabler) (names []string, err 
errors.Error) {
        pkColumns, err := GetPrimarykeyColumns(d, dst)
        if err != nil {
                return
diff --git a/plugins/core/plugin_db_migration.go 
b/plugins/core/plugin_migration.go
similarity index 53%
rename from plugins/core/plugin_db_migration.go
rename to plugins/core/plugin_migration.go
index 005aca3d..af8feb3b 100644
--- a/plugins/core/plugin_db_migration.go
+++ b/plugins/core/plugin_migration.go
@@ -17,8 +17,31 @@ limitations under the License.
 
 package core
 
-import "github.com/apache/incubator-devlake/migration"
+import (
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/migration"
+)
 
+// MigrationScript upgrades database to a newer version
+type MigrationScript interface {
+       Up(basicRes BasicRes) errors.Error
+       Version() uint64
+       Name() string
+}
+
+// Migrator is responsible for making sure the registered scripts get applied 
to database and only once
+type Migrator interface {
+       Register(scripts []MigrationScript, comment string)
+       Execute() errors.Error
+       HasPendingScripts() bool
+}
+
+// PluginMigration is implemented by the plugin to declare all migration 
script that have to be applied to the database
+type PluginMigration interface {
+       MigrationScripts() []MigrationScript
+}
+
+// TODO: remove this interface
 type Migratable interface {
        MigrationScripts() []migration.Script
 }
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index 802c1c44..ea09e98e 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -20,8 +20,6 @@ package core
 import (
        "context"
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
-       "gorm.io/gorm"
 )
 
 type ProgressType int
@@ -42,14 +40,6 @@ type RunningProgress struct {
        SubTaskNumber int
 }
 
-type BasicRes interface {
-       GetConfig(name string) string
-       GetLogger() Logger
-       // Deprecated: use dal instead
-       GetDb() *gorm.DB
-       GetDal() dal.Dal
-}
-
 // ExecContext This interface define all resources that needed for 
task/subtask execution
 type ExecContext interface {
        BasicRes
diff --git a/plugins/customize/api/api.go b/plugins/customize/api/api.go
index 13129cf9..39b53391 100644
--- a/plugins/customize/api/api.go
+++ b/plugins/customize/api/api.go
@@ -87,7 +87,7 @@ func deleteField(d dal.Dal, table, field string) errors.Error 
{
        if !exists {
                return nil
        }
-       err = d.DropColumn(table, field)
+       err = d.DropColumns(table, field)
        if err != nil {
                return errors.Default.Wrap(err, "DropColumn error")
        }
diff --git a/plugins/github/e2e/pr_commit_test.go 
b/plugins/github/e2e/pr_commit_test.go
index 479f6b36..9a34d84e 100644
--- a/plugins/github/e2e/pr_commit_test.go
+++ b/plugins/github/e2e/pr_commit_test.go
@@ -50,6 +50,7 @@ func TestPrCommitDataFlow(t *testing.T) {
        // verify extraction
        dataflowTester.FlushTabler(&models.GithubCommit{})
        dataflowTester.FlushTabler(&models.GithubPrCommit{})
+       dataflowTester.FlushTabler(&models.GithubRepoCommit{})
        dataflowTester.Subtask(tasks.ExtractApiPullRequestCommitsMeta, taskData)
        dataflowTester.VerifyTable(
                models.GithubCommit{},
diff --git a/plugins/github/impl/impl.go b/plugins/github/impl/impl.go
index c0459971..a6290b3a 100644
--- a/plugins/github/impl/impl.go
+++ b/plugins/github/impl/impl.go
@@ -37,7 +37,8 @@ var _ core.PluginMeta = (*Github)(nil)
 var _ core.PluginInit = (*Github)(nil)
 var _ core.PluginTask = (*Github)(nil)
 var _ core.PluginApi = (*Github)(nil)
-var _ core.Migratable = (*Github)(nil)
+
+// var _ core.Migratable = (*Github)(nil)
 var _ core.PluginBlueprintV100 = (*Github)(nil)
 var _ core.CloseablePluginTask = (*Github)(nil)
 
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index f41df20b..427e8694 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -41,10 +41,11 @@ type BatchSave struct {
        size       int
        valueIndex map[string]int
        primaryKey []reflect.StructField
+       tableName  string
 }
 
 // NewBatchSave creates a new BatchSave instance
-func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchSave, errors.Error) {
+func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int, 
tableName ...string) (*BatchSave, errors.Error) {
        if slotType.Kind() != reflect.Ptr {
                return nil, errors.Default.New("slotType must be a pointer")
        }
@@ -54,6 +55,10 @@ func NewBatchSave(basicRes core.BasicRes, slotType 
reflect.Type, size int) (*Bat
        if len(primaryKey) == 0 {
                return nil, errors.Default.New(fmt.Sprintf("%s no primary key", 
slotType.String()))
        }
+       tn := ""
+       if len(tableName) == 1 {
+               tn = tableName[0]
+       }
 
        log := basicRes.GetLogger().Nested(slotType.String())
        return &BatchSave{
@@ -65,6 +70,7 @@ func NewBatchSave(basicRes core.BasicRes, slotType 
reflect.Type, size int) (*Bat
                size:       size,
                valueIndex: make(map[string]int),
                primaryKey: primaryKey,
+               tableName:  tn,
        }, nil
 }
 
@@ -101,7 +107,11 @@ func (c *BatchSave) Add(slot interface{}) errors.Error {
 
 // Flush save cached records into database
 func (c *BatchSave) Flush() errors.Error {
-       err := c.db.CreateOrUpdate(c.slots.Slice(0, c.current).Interface())
+       clauses := make([]dal.Clause, 0)
+       if c.tableName != "" {
+               clauses = append(clauses, dal.From(c.tableName))
+       }
+       err := c.db.CreateOrUpdate(c.slots.Slice(0, c.current).Interface(), 
clauses...)
        if err != nil {
                return err
        }
diff --git a/plugins/helper/default_task_context.go 
b/plugins/helper/default_task_context.go
index 9e196fe2..f9aea668 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -35,7 +35,8 @@ import (
 // bridge to current implementation at this point
 // TODO: implement another TaskContext for distributed runner/worker
 
-// DefaultBasicRes FIXME ...
+// DefaultBasicRes offers a defult BasicRes implementation
+// TODO: move to `impl` package
 type DefaultBasicRes struct {
        cfg    *viper.Viper
        logger core.Logger
@@ -63,7 +64,7 @@ func (c *DefaultBasicRes) GetLogger() core.Logger {
        return c.logger
 }
 
-// NewDefaultBasicRes FIXME ...
+// NewDefaultBasicRes returns a new DefaultBasicRes instance
 func NewDefaultBasicRes(
        cfg *viper.Viper,
        logger core.Logger,
diff --git a/runner/directrun.go b/runner/directrun.go
index 7c67c9f1..cbe72ee2 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -28,6 +28,8 @@ import (
        "syscall"
 
        "github.com/apache/incubator-devlake/config"
+       "github.com/apache/incubator-devlake/impl"
+       "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/migration"
        "github.com/apache/incubator-devlake/plugins/core"
@@ -72,9 +74,12 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask 
core.PluginTask, op
        }
 
        // collect migration and run
-       migration.Init(db)
-       if migratable, ok := pluginTask.(core.Migratable); ok {
-               RegisterMigrationScripts(migratable.MigrationScripts(), 
cmd.Use, cfg, log)
+       migrator, err := InitMigrator(impl.NewDefaultBasicRes(cfg, log, 
dalgorm.NewDalgorm(db)))
+       if err != nil {
+               panic(err)
+       }
+       if migratable, ok := pluginTask.(core.PluginMigration); ok {
+               migrator.Register(migratable.MigrationScripts(), cmd.Use)
        }
        err = migration.Execute(context.Background())
        if err != nil {
diff --git a/runner/loader.go b/runner/loader.go
index 7d1503bb..ff50ff4c 100644
--- a/runner/loader.go
+++ b/runner/loader.go
@@ -57,9 +57,6 @@ func LoadPlugins(pluginsDir string, config *viper.Viper, 
logger core.Logger, db
                                        return err
                                }
                        }
-                       if migratable, ok := symPluginEntry.(core.Migratable); 
ok {
-                               
RegisterMigrationScripts(migratable.MigrationScripts(), pluginName, config, 
logger)
-                       }
                        err = core.RegisterPlugin(pluginName, pluginMeta)
                        if err != nil {
                                return nil
diff --git a/runner/migration.go b/runner/migration.go
index 650f6f0f..9c2b59dd 100644
--- a/runner/migration.go
+++ b/runner/migration.go
@@ -18,10 +18,36 @@ limitations under the License.
 package runner
 
 import (
-       "github.com/apache/incubator-devlake/migration"
+       "sync"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/impl/migration"
        "github.com/apache/incubator-devlake/plugins/core"
 )
 
+var migrator core.Migrator
+
+var lock sync.Mutex
+
+// InitMigrator a Migrator singleton
+func InitMigrator(basicRes core.BasicRes) (core.Migrator, errors.Error) {
+       lock.Lock()
+       defer lock.Unlock()
+
+       if migrator != nil {
+               return nil, errors.Internal.New("migrator singleton has already 
been initialized")
+       }
+       var err errors.Error
+       migrator, err = migration.NewMigrator(basicRes)
+       return migrator, err
+}
+
+// GetMigrator returns the shared Migrator singleton
+func GetMigrator() core.Migrator {
+       return migrator
+}
+
+/*
 // RegisterMigrationScripts FIXME ...
 func RegisterMigrationScripts(scripts []migration.Script, comment string, 
config core.ConfigGetter, logger core.Logger) {
        for _, script := range scripts {
@@ -34,3 +60,4 @@ func RegisterMigrationScripts(scripts []migration.Script, 
comment string, config
        }
        migration.Register(scripts, comment)
 }
+*/
diff --git a/scripts/pm/gitlab/pipelines-list.sh 
b/scripts/pm/gitlab/pipelines-list.sh
new file mode 100755
index 00000000..2a5c559e
--- /dev/null
+++ b/scripts/pm/gitlab/pipelines-list.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. "$(dirname $0)/../vars/active-vars.sh"
+
+project_id=${1-"11624398"}
+
+curl -sv $GITLAB_ENDPOINT/projects/$project_id/pipelines \
+    -H "Authorization: Bearer $GITLAB_TOKEN" \
+    | jq
diff --git a/services/init.go b/services/init.go
index cb1a4319..25ac9ab4 100644
--- a/services/init.go
+++ b/services/init.go
@@ -18,15 +18,15 @@ limitations under the License.
 package services
 
 import (
-       "context"
-       "sync"
        "time"
 
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/impl"
+       "sync"
 
        "github.com/apache/incubator-devlake/config"
+       "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/logger"
-       "github.com/apache/incubator-devlake/migration"
        "github.com/apache/incubator-devlake/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/runner"
@@ -36,10 +36,11 @@ import (
 )
 
 var cfg *viper.Viper
+var log core.Logger
 var db *gorm.DB
+var basicRes core.BasicRes
+var migrator core.Migrator
 var cronManager *cron.Cron
-var log core.Logger
-var migrationRequireConfirmation bool
 var cronLocker sync.Mutex
 
 const failToCreateCronJob = "created cron job failed"
@@ -47,16 +48,22 @@ const failToCreateCronJob = "created cron job failed"
 // Init the services module
 func Init() {
        var err error
+       // basic resources initialization
        cfg = config.GetConfig()
        log = logger.Global
        db, err = runner.NewGormDb(cfg, logger.Global.Nested("db"))
-       location := cron.WithLocation(time.UTC)
-       cronManager = cron.New(location)
        if err != nil {
                panic(err)
        }
-       migration.Init(db)
-       runner.RegisterMigrationScripts(migrationscripts.All(), "Framework", 
cfg, logger.Global)
+       basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+
+       // initialize db migrator singletone
+       migrator, err = runner.InitMigrator(basicRes)
+       if err != nil {
+               panic(err)
+       }
+       migrator.Register(migrationscripts.All(), "Framework")
+
        // load plugins
        err = runner.LoadPlugins(
                cfg.GetString("PLUGIN_DIR"),
@@ -67,31 +74,41 @@ func Init() {
        if err != nil {
                panic(err)
        }
+       for pluginName, pluginInst := range core.AllPlugins() {
+               if migratable, ok := pluginInst.(core.PluginMigration); ok {
+                       migrator.Register(migratable.MigrationScripts(), 
pluginName)
+               }
+       }
        forceMigration := cfg.GetBool("FORCE_MIGRATION")
-       if !migration.NeedConfirmation() || forceMigration {
+       if !migrator.HasPendingScripts() || forceMigration {
                err = ExecuteMigration()
                if err != nil {
                        panic(err)
                }
-       } else {
-               migrationRequireConfirmation = true
        }
-       log.Info("Db migration confirmation needed: %v", 
migrationRequireConfirmation)
+       log.Info("Db migration confirmation needed")
 }
 
 // ExecuteMigration executes all pending migration scripts and initialize 
services module
 func ExecuteMigration() errors.Error {
-       err := migration.Execute(context.Background())
+       // apply all pending migration scripts
+       err := migrator.Execute()
        if err != nil {
                return err
        }
+
+       // cronjob for blueprint triggering
+       location := cron.WithLocation(time.UTC)
+       cronManager = cron.New(location)
+       if err != nil {
+               panic(err)
+       }
        // call service init
        pipelineServiceInit()
-       migrationRequireConfirmation = false
        return nil
 }
 
 // MigrationRequireConfirmation returns if there were migration scripts 
waiting to be executed
 func MigrationRequireConfirmation() bool {
-       return migrationRequireConfirmation
+       return migrator.HasPendingScripts()
 }

Reply via email to