This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 94c7dd82 fix(gitlab): modify migraion script
94c7dd82 is described below

commit 94c7dd8203795b3f0a7990b4f375b3f2a5819eba
Author: Yingchu Chen <[email protected]>
AuthorDate: Fri Sep 9 19:16:07 2022 +0800

    fix(gitlab): modify migraion script
    
    relate to #2993
---
 impl/dalgorm/dalgorm.go                            |  5 ++
 plugins/core/dal/dal.go                            |  2 +
 plugins/gitlab/api/blueprint.go                    |  2 +-
 plugins/gitlab/api/connection.go                   |  2 +-
 plugins/gitlab/api/init.go                         |  6 +-
 plugins/gitlab/api/proxy.go                        |  2 +-
 .../20220906_fix_duration_to_float8.go             | 17 +++--
 plugins/helper/batch_save.go                       | 72 ++++++++++++++++++++--
 8 files changed, 90 insertions(+), 18 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 2b41cfe6..59ac6472 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -146,6 +146,11 @@ func (d *Dalgorm) Delete(entity interface{}, clauses 
...dal.Clause) error {
        return buildTx(d.db, clauses).Delete(entity).Error
 }
 
+// UpdateColumns batch records in database
+func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause) 
error {
+       return buildTx(d.db, clauses).UpdateColumns(entity).Error
+}
+
 // GetColumns FIXME ...
 func (d *Dalgorm) GetColumns(dst schema.Tabler, filter func(columnMeta 
dal.ColumnMeta) bool) (cms []dal.ColumnMeta, err error) {
        columnTypes, err := d.db.Migrator().ColumnTypes(dst.TableName())
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 810609c9..2097dc10 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -73,6 +73,8 @@ type Dal interface {
        Create(entity interface{}, clauses ...Clause) error
        // Update updates record
        Update(entity interface{}, clauses ...Clause) error
+       // UpdateColumns batch records in database
+       UpdateColumns(entity interface{}, clauses ...Clause) error
        // CreateOrUpdate tries to create the record, or fallback to update all 
if failed
        CreateOrUpdate(entity interface{}, clauses ...Clause) error
        // CreateIfNotExist tries to create the record if not exist
diff --git a/plugins/gitlab/api/blueprint.go b/plugins/gitlab/api/blueprint.go
index e0019610..9c35cab2 100644
--- a/plugins/gitlab/api/blueprint.go
+++ b/plugins/gitlab/api/blueprint.go
@@ -108,7 +108,7 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta, 
connectionId uint64, scop
                                },
                                10*time.Second,
                                connection.Proxy,
-                               basicRes,
+                               BasicRes,
                        )
                        if err != nil {
                                return nil, err
diff --git a/plugins/gitlab/api/connection.go b/plugins/gitlab/api/connection.go
index 957fe682..5f953cd1 100644
--- a/plugins/gitlab/api/connection.go
+++ b/plugins/gitlab/api/connection.go
@@ -60,7 +60,7 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
                },
                3*time.Second,
                connection.Proxy,
-               basicRes,
+               BasicRes,
        )
        if err != nil {
                return nil, err
diff --git a/plugins/gitlab/api/init.go b/plugins/gitlab/api/init.go
index 6774e148..f1fb68f9 100644
--- a/plugins/gitlab/api/init.go
+++ b/plugins/gitlab/api/init.go
@@ -27,13 +27,13 @@ import (
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
-var basicRes core.BasicRes
+var BasicRes core.BasicRes
 
 func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+       BasicRes = helper.NewDefaultBasicRes(config, logger, database)
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
-               basicRes,
+               BasicRes,
                vld,
        )
 }
diff --git a/plugins/gitlab/api/proxy.go b/plugins/gitlab/api/proxy.go
index ae56772f..70f9b23c 100644
--- a/plugins/gitlab/api/proxy.go
+++ b/plugins/gitlab/api/proxy.go
@@ -47,7 +47,7 @@ func Proxy(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, error) {
                },
                30*time.Second,
                connection.Proxy,
-               basicRes,
+               BasicRes,
        )
        if err != nil {
                return nil, err
diff --git 
a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go 
b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
index 903b0073..e2009c78 100644
--- a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
+++ b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
@@ -19,7 +19,11 @@ package migrationscripts
 
 import (
        "context"
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/gitlab/api"
+       "github.com/apache/incubator-devlake/plugins/helper"
        "gorm.io/gorm"
+       "reflect"
 )
 
 type fixDurationToFloat8 struct{}
@@ -41,22 +45,23 @@ func (*fixDurationToFloat8) Up(ctx context.Context, db 
*gorm.DB) error {
        if err != nil {
                return err
        }
-
        cursor, err := 
db.Model(&GitlabJob20220906{}).Select([]string{"connection_id", "gitlab_id", 
"duration"}).Rows()
        if err != nil {
                return err
        }
+       batch, err := helper.NewBatchUpdate(api.BasicRes, 
reflect.TypeOf(&GitlabJob20220906{}), 500)
+       if err != nil {
+               return errors.Default.Wrap(err, "error getting batch from 
table", errors.UserMessage("Internal Converter execution error"))
+       }
+       defer batch.Close()
        for cursor.Next() {
                job := GitlabJob20220906{}
                err = db.ScanRows(cursor, &job)
                if err != nil {
                        return err
                }
-               err = db.
-                       Model(job).
-                       Where(`connection_id=? AND gitlab_id=?`, 
job.ConnectionId, job.GitlabId).
-                       Update(`duration2`, job.Duration).
-                       Error
+               job.Duration2 = job.Duration
+               err = batch.Add(&job)
                if err != nil {
                        return err
                }
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index c76b00ac..d6990a3f 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -42,6 +42,11 @@ type BatchSave struct {
        primaryKey []reflect.StructField
 }
 
+// BatchUpdate will update records by batch
+type BatchUpdate struct {
+       *BatchSave
+}
+
 // NewBatchSave creates a new BatchSave instance
 func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchSave, error) {
        if slotType.Kind() != reflect.Ptr {
@@ -67,8 +72,49 @@ func NewBatchSave(basicRes core.BasicRes, slotType 
reflect.Type, size int) (*Bat
        }, nil
 }
 
+// NewBatchUpdate creates a new BatchUpdate instance
+func NewBatchUpdate(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchUpdate, error) {
+       batchSave, err := NewBatchSave(basicRes, slotType, size)
+       if err != nil {
+               return nil, err
+       }
+       batchUpdate := BatchUpdate{
+               BatchSave: batchSave,
+       }
+       return &batchUpdate, nil
+}
+
 // Add record to cache. BatchSave would flush them into Database when cache is 
max out
 func (c *BatchSave) Add(slot interface{}) error {
+       err := c.prepareForFlush(slot)
+       if err != nil {
+               return err
+       }
+       // flush out into database if max outed
+       if c.current == c.size {
+               return c.Flush()
+       } else if c.current%100 == 0 {
+               c.log.Debug("batch save current: %d", c.current)
+       }
+       return nil
+}
+
+// Update record to cache. BatchSave would flush them into Database when cache 
is max out
+func (c *BatchUpdate) Update(slot interface{}) error {
+       err := c.prepareForFlush(slot)
+       if err != nil {
+               return err
+       }
+       // flush out into database if max outed
+       if c.current == c.size {
+               return c.FlushUpdate()
+       } else if c.current%100 == 0 {
+               c.log.Debug("batch save current: %d", c.current)
+       }
+       return nil
+}
+
+func (c *BatchSave) prepareForFlush(slot interface{}) error {
        // type checking
        if reflect.TypeOf(slot) != c.slotType {
                return errors.Default.New("sub cache type mismatched")
@@ -89,12 +135,6 @@ func (c *BatchSave) Add(slot interface{}) error {
        }
        c.slots.Index(c.current).Set(reflect.ValueOf(slot))
        c.current++
-       // flush out into database if max outed
-       if c.current == c.size {
-               return c.Flush()
-       } else if c.current%100 == 0 {
-               c.log.Debug("batch save current: %d", c.current)
-       }
        return nil
 }
 
@@ -110,6 +150,18 @@ func (c *BatchSave) Flush() error {
        return nil
 }
 
+// FlushUpdate update cached records into database
+func (c *BatchUpdate) FlushUpdate() error {
+       err := c.db.UpdateColumns(c.slots.Slice(0, c.current).Interface())
+       if err != nil {
+               return err
+       }
+       c.log.Debug("batch save flush total %d records to database", c.current)
+       c.current = 0
+       c.valueIndex = make(map[string]int)
+       return nil
+}
+
 // Close would flash the cache and release resources
 func (c *BatchSave) Close() error {
        if c.current > 0 {
@@ -118,6 +170,14 @@ func (c *BatchSave) Close() error {
        return nil
 }
 
+// Close would flash the cache and release resources
+func (c *BatchUpdate) Close() error {
+       if c.current > 0 {
+               return c.FlushUpdate()
+       }
+       return nil
+}
+
 func getKeyValue(iface interface{}, primaryKey []reflect.StructField) string {
        var ss []string
        ifv := reflect.ValueOf(iface)

Reply via email to