This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new af3b9b7e Refactor helper to replace `GetDb` with `GetDal` (#2184)
af3b9b7e is described below

commit af3b9b7e2a49b528b23854db630e98e92cb99ce1
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Jun 14 16:24:00 2022 +0800

    Refactor helper to replace `GetDb` with `GetDal` (#2184)
    
    * fix: ignore '.so' files when loading plugins
    
    * fix: missing mock for method `Nested`
    
    * fix: worker errors was not caught
    
    * fix: worker_scheduler wouldn't halt on error
    
    * refactor: replace `GetDb` with `GetDal`
    
      Closes #2183
    
    * fix: unit test
---
 impl/dalgorm/dalgorm.go                          |  7 ++
 plugins/core/dal/dal.go                          |  2 +
 plugins/gitextractor/main.go                     |  3 +-
 plugins/gitextractor/store/database.go           | 25 ++++---
 plugins/gitextractor/tasks/git_repo_collector.go |  3 +-
 plugins/helper/api_extractor.go                  | 40 ++++--------
 plugins/helper/batch_save.go                     | 38 ++++++-----
 plugins/helper/batch_save_divider.go             | 70 ++++++++++++--------
 plugins/helper/batch_save_divider_test.go        | 83 ++++++++++++++++++++++++
 plugins/helper/batch_save_divider_test.go.old    | 66 -------------------
 plugins/helper/data_convertor.go                 | 31 ++-------
 11 files changed, 196 insertions(+), 172 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 5a340b02..5fc897d0 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -107,6 +107,13 @@ func (d *Dalgorm) First(dst interface{}, clauses 
...dal.Clause) error {
        return buildTx(d.db, clauses).First(dst).Error
 }
 
+// Count total records
+func (d *Dalgorm) Count(clauses ...dal.Clause) (int64, error) {
+       var count int64
+       err := buildTx(d.db, clauses).Count(&count).Error
+       return count, err
+}
+
 // Pluck used to query single column
 func (d *Dalgorm) Pluck(column string, dest interface{}, clauses 
...dal.Clause) error {
        return buildTx(d.db, clauses).Pluck(column, dest).Error
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index daf43b4a..5f97df5a 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -44,6 +44,8 @@ type Dal interface {
        All(dst interface{}, clauses ...Clause) error
        // First loads first matched row from database to `dst`, error will be 
returned if no records were found
        First(dst interface{}, clauses ...Clause) error
+       // All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+       Count(clauses ...Clause) (int64, error)
        // Pluck used to query single column
        Pluck(column string, dest interface{}, clauses ...Clause) error
        // Create insert record to database
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index 300ae5b0..337b274e 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -61,7 +61,8 @@ func main() {
                if err != nil {
                        panic(err)
                }
-               storage = store.NewDatabase(database, log)
+               basicRes := helper.NewDefaultBasicRes(nil, log, database)
+               storage = store.NewDatabase(basicRes, *url)
        } else {
                panic("either specify `-output` or `-db` argument as 
destination")
        }
diff --git a/plugins/gitextractor/store/database.go 
b/plugins/gitextractor/store/database.go
index 4e264fc6..68be40b5 100644
--- a/plugins/gitextractor/store/database.go
+++ b/plugins/gitextractor/store/database.go
@@ -18,11 +18,12 @@ limitations under the License.
 package store
 
 import (
+       "fmt"
+       "reflect"
+
        "github.com/apache/incubator-devlake/models/domainlayer/code"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "gorm.io/gorm"
-       "reflect"
 )
 
 const BathSize = 100
@@ -33,15 +34,19 @@ type Database struct {
        log    core.Logger
 }
 
-func NewDatabase(db *gorm.DB, log core.Logger) *Database {
+func NewDatabase(basicRes core.BasicRes, repoUrl string) *Database {
        database := new(Database)
-       database.driver = helper.NewBatchSaveDivider(db, BathSize)
-       database.log = log
+       database.driver = helper.NewBatchSaveDivider(
+               basicRes,
+               BathSize,
+               "gitextractor",
+               fmt.Sprintf(`{"RepoUrl": "%s"}`, repoUrl),
+       )
        return database
 }
 
 func (d *Database) RepoCommits(repoCommit *code.RepoCommit) error {
-       batch, err := d.driver.ForType(reflect.TypeOf(repoCommit), d.log)
+       batch, err := d.driver.ForType(reflect.TypeOf(repoCommit))
        if err != nil {
                return err
        }
@@ -49,7 +54,7 @@ func (d *Database) RepoCommits(repoCommit *code.RepoCommit) 
error {
 }
 
 func (d *Database) Commits(commit *code.Commit) error {
-       batch, err := d.driver.ForType(reflect.TypeOf(commit), d.log)
+       batch, err := d.driver.ForType(reflect.TypeOf(commit))
        if err != nil {
                return err
        }
@@ -57,7 +62,7 @@ func (d *Database) Commits(commit *code.Commit) error {
 }
 
 func (d *Database) Refs(ref *code.Ref) error {
-       batch, err := d.driver.ForType(reflect.TypeOf(ref), d.log)
+       batch, err := d.driver.ForType(reflect.TypeOf(ref))
        if err != nil {
                return err
        }
@@ -65,7 +70,7 @@ func (d *Database) Refs(ref *code.Ref) error {
 }
 
 func (d *Database) CommitFiles(file *code.CommitFile) error {
-       batch, err := d.driver.ForType(reflect.TypeOf(file), d.log)
+       batch, err := d.driver.ForType(reflect.TypeOf(file))
        if err != nil {
                return err
        }
@@ -76,7 +81,7 @@ func (d *Database) CommitParents(pp []*code.CommitParent) 
error {
        if len(pp) == 0 {
                return nil
        }
-       batch, err := d.driver.ForType(reflect.TypeOf(pp[0]), d.log)
+       batch, err := d.driver.ForType(reflect.TypeOf(pp[0]))
        if err != nil {
                return err
        }
diff --git a/plugins/gitextractor/tasks/git_repo_collector.go 
b/plugins/gitextractor/tasks/git_repo_collector.go
index 4dcb8cd2..c5b86bd6 100644
--- a/plugins/gitextractor/tasks/git_repo_collector.go
+++ b/plugins/gitextractor/tasks/git_repo_collector.go
@@ -55,9 +55,8 @@ func (o GitExtractorOptions) Valid() error {
 
 func CollectGitRepo(subTaskCtx core.SubTaskContext) error {
        var err error
-       db := subTaskCtx.GetDb()
-       storage := store.NewDatabase(db, subTaskCtx.GetLogger())
        op := subTaskCtx.GetData().(GitExtractorOptions)
+       storage := store.NewDatabase(subTaskCtx, op.Url)
        p := parser.NewLibGit2(storage, subTaskCtx)
        if strings.HasPrefix(op.Url, "http") {
                err = p.CloneOverHTTP(op.RepoId, op.Url, op.User, op.Password, 
op.Proxy)
diff --git a/plugins/helper/api_extractor.go b/plugins/helper/api_extractor.go
index fc09c27e..25fe4b95 100644
--- a/plugins/helper/api_extractor.go
+++ b/plugins/helper/api_extractor.go
@@ -18,7 +18,6 @@ limitations under the License.
 package helper
 
 import (
-       "fmt"
        "reflect"
 
        "github.com/apache/incubator-devlake/models/common"
@@ -37,12 +36,13 @@ type ApiExtractorArgs struct {
 // It reads rows from specified raw data table, and feed it into `Extract` 
handler
 // you can return arbitrary tool layer entities in this handler, ApiExtractor 
would
 // first delete old data by their RawDataOrigin information, and then perform a
-// batch insertion for you.
+// batch save for you.
 type ApiExtractor struct {
        *RawDataSubTask
        args *ApiExtractorArgs
 }
 
+// NewApiExtractor creates a new ApiExtractor
 func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error) {
        // process args
        rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
@@ -58,6 +58,7 @@ func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, 
error) {
        }, nil
 }
 
+// Execute sub-task
 func (extractor *ApiExtractor) Execute() error {
        // load data from database
        db := extractor.args.Ctx.GetDb()
@@ -76,25 +77,9 @@ func (extractor *ApiExtractor) Execute() error {
        defer cursor.Close()
        row := &RawData{}
 
-       // batch insertion divider
+       // batch save divider
        RAW_DATA_ORIGIN := "RawDataOrigin"
-       divider := NewBatchSaveDivider(db, extractor.args.BatchSize)
-       divider.OnNewBatchSave(func(rowType reflect.Type) error {
-               // check if row type has RawDataOrigin
-               if rawDataOrigin, ok := 
rowType.Elem().FieldByName(RAW_DATA_ORIGIN); ok {
-                       if (rawDataOrigin.Type != 
reflect.TypeOf(common.RawDataOrigin{})) {
-                               return fmt.Errorf("type %s must nested 
RawDataOrigin struct", rowType.Name())
-                       }
-               } else {
-                       return fmt.Errorf("type %s must nested RawDataOrigin 
struct", rowType.Name())
-               }
-               // delete old data
-               return db.Delete(
-                       reflect.New(rowType).Interface(),
-                       "_raw_data_table = ? AND _raw_data_params = ?",
-                       extractor.table, extractor.params,
-               ).Error
-       })
+       divider := NewBatchSaveDivider(extractor.args.Ctx, 
extractor.args.BatchSize, extractor.table, extractor.params)
 
        // prgress
        extractor.args.Ctx.SetProgress(0, -1)
@@ -118,16 +103,19 @@ func (extractor *ApiExtractor) Execute() error {
 
                for _, result := range results {
                        // get the batch operator for the specific type
-                       batch, err := divider.ForType(reflect.TypeOf(result), 
log)
+                       batch, err := divider.ForType(reflect.TypeOf(result))
                        if err != nil {
                                return err
                        }
                        // set raw data origin field
-                       
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN).Set(reflect.ValueOf(common.RawDataOrigin{
-                               RawDataTable:  extractor.table,
-                               RawDataId:     row.ID,
-                               RawDataParams: row.Params,
-                       }))
+                       origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+                       if origin.IsValid() {
+                               origin.Set(reflect.ValueOf(common.RawDataOrigin{
+                                       RawDataTable:  extractor.table,
+                                       RawDataId:     row.ID,
+                                       RawDataParams: row.Params,
+                               }))
+                       }
                        // records get saved into db when slots were max outed
                        err = batch.Add(result)
                        if err != nil {
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index a5d66a7f..28643fee 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -19,47 +19,50 @@ package helper
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/plugins/core"
-       "gorm.io/gorm"
-       "gorm.io/gorm/clause"
        "reflect"
        "strings"
+
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
-// Insert data by batch can increase database performance drastically, this 
class aim to make batch-save easier,
-// It takes care the database operation for specified `slotType`, records got 
saved into database whenever cache hits
-// The `size` limit, remember to call the `Close` method to save the last batch
+// BatchSave performs mulitple records persistence of a specific type in one 
sql query to improve the performance
 type BatchSave struct {
+       basicRes core.BasicRes
+       log      core.Logger
+       db       dal.Dal
        slotType reflect.Type
        // slots can not be []interface{}, because gorm wouldn't take it
        // I'm guessing the reason is the type information lost when converted 
to interface{}
        slots      reflect.Value
-       db         *gorm.DB
-       logger     core.Logger
        current    int
        size       int
        valueIndex map[string]int
 }
 
-func NewBatchSave(db *gorm.DB, log core.Logger, slotType reflect.Type, size 
int) (*BatchSave, error) {
+const BATCH_SAVE_UPDATE_ONLY = 0
+
+// NewBatchSave creates a new BatchSave instance
+func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) 
(*BatchSave, error) {
        if slotType.Kind() != reflect.Ptr {
                return nil, fmt.Errorf("slotType must be a pointer")
        }
        if !hasPrimaryKey(slotType) {
                return nil, fmt.Errorf("%s no primary key", slotType.String())
        }
-       log = log.Nested(slotType.String())
-       log.Info("create batch save success")
+       log := basicRes.GetLogger().Nested(slotType.String())
        return &BatchSave{
+               basicRes:   basicRes,
+               log:        log,
+               db:         basicRes.GetDal(),
                slotType:   slotType,
-               logger:     log,
                slots:      reflect.MakeSlice(reflect.SliceOf(slotType), size, 
size),
-               db:         db,
                size:       size,
                valueIndex: make(map[string]int),
        }, nil
 }
 
+// Add record to cache. BatchSave would flush them into Database when cache is 
max out
 func (c *BatchSave) Add(slot interface{}) error {
        // type checking
        if reflect.TypeOf(slot) != c.slotType {
@@ -85,23 +88,24 @@ func (c *BatchSave) Add(slot interface{}) error {
        if c.current == c.size {
                return c.Flush()
        } else if c.current%100 == 0 {
-               c.logger.Debug("batch save current: %d", c.current)
+               c.log.Debug("batch save current: %d", c.current)
        }
        return nil
 }
 
+// Flush save cached records into database
 func (c *BatchSave) Flush() error {
-       result := c.db.Clauses(clause.OnConflict{UpdateAll: 
true}).Create(c.slots.Slice(0, c.current).Interface())
-       err := result.Error
+       err := c.db.CreateOrUpdate(c.slots.Slice(0, c.current).Interface())
        if err != nil {
                return err
        }
-       c.logger.Info("batch save flush %d and %d success", c.slots.Slice(0, 
c.current).Len(), result.RowsAffected)
+       c.log.Debug("batch save flush total %d records to database", c.current)
        c.current = 0
        c.valueIndex = make(map[string]int)
        return nil
 }
 
+// Close would flash the cache and release resources
 func (c *BatchSave) Close() error {
        if c.current > 0 {
                return c.Flush()
diff --git a/plugins/helper/batch_save_divider.go 
b/plugins/helper/batch_save_divider.go
index 63003e98..6f1cbffb 100644
--- a/plugins/helper/batch_save_divider.go
+++ b/plugins/helper/batch_save_divider.go
@@ -18,58 +18,76 @@ limitations under the License.
 package helper
 
 import (
-       "github.com/apache/incubator-devlake/plugins/core"
+       "fmt"
        "reflect"
 
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/models/common"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
-type OnNewBatchSave func(rowType reflect.Type) error
-
-// Holds a map of BatchInsert, return `*BatchInsert` for a specific records, 
so caller can do batch operation for it
+// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing 
with massive amount of data records
+// with arbitrary types.
 type BatchSaveDivider struct {
-       db             *gorm.DB
-       batches        map[reflect.Type]*BatchSave
-       batchSize      int
-       onNewBatchSave OnNewBatchSave
+       basicRes  core.BasicRes
+       log       core.Logger
+       db        dal.Dal
+       batches   map[reflect.Type]*BatchSave
+       batchSize int
+       table     string
+       params    string
 }
 
-// Return a new BatchInsertDivider instance
-func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider {
+// NewBatchSaveDivider create a new BatchInsertDivider instance
+func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, 
params string) *BatchSaveDivider {
+       log := basicRes.GetLogger().Nested("batch divider")
        return &BatchSaveDivider{
-               db:        db,
+               basicRes:  basicRes,
+               log:       log,
+               db:        basicRes.GetDal(),
                batches:   make(map[reflect.Type]*BatchSave),
                batchSize: batchSize,
+               table:     table,
+               params:    params,
        }
 }
 
-func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave) {
-       d.onNewBatchSave = cb
-}
-
-// return *BatchSave for specified type
-func (d *BatchSaveDivider) ForType(rowType reflect.Type, log core.Logger) 
(*BatchSave, error) {
+// ForType returns a `BatchSave` instance for specific type
+func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error) {
        // get the cache for the specific type
        batch := d.batches[rowType]
        var err error
        // create one if not exists
        if batch == nil {
-               batch, err = NewBatchSave(d.db, log, rowType, d.batchSize)
+               batch, err = NewBatchSave(d.basicRes, rowType, d.batchSize)
                if err != nil {
                        return nil, err
                }
-               if d.onNewBatchSave != nil {
-                       err = d.onNewBatchSave(rowType)
-                       if err != nil {
-                               return nil, err
-                       }
-               }
                d.batches[rowType] = batch
+               // delete outdated records if rowType was not PartialUpdate
+               rowElemType := rowType.Elem()
+               d.log.Debug("missing BatchSave for type %s", rowElemType.Name())
+               row := reflect.New(rowElemType).Interface()
+               // check if rowType had RawDataOrigin embeded
+               field, hasField := rowElemType.FieldByName("RawDataOrigin")
+               if !hasField || field.Type != 
reflect.TypeOf(common.RawDataOrigin{}) {
+                       return nil, fmt.Errorf("type %s must have RawDataOrigin 
embeded", rowElemType.Name())
+               }
+               // all good, delete outdated records before we insertion
+               d.log.Debug("deleting outdate records for %s", 
rowElemType.Name())
+               d.db.Delete(
+                       row,
+                       dal.Where("_raw_data_table = ? AND _raw_data_params = 
?", d.table, d.params),
+               )
        }
        return batch, nil
 }
 
-// close all batches so all rest records get saved into db as well
+func (d *BatchSaveDivider) flushBatch() {
+
+}
+
+// Close all batches so the rest records get saved into db
 func (d *BatchSaveDivider) Close() error {
        for _, batch := range d.batches {
                err := batch.Close()
diff --git a/plugins/helper/batch_save_divider_test.go 
b/plugins/helper/batch_save_divider_test.go
new file mode 100644
index 00000000..ae933a05
--- /dev/null
+++ b/plugins/helper/batch_save_divider_test.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+       "reflect"
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-devlake/helpers/unithelper"
+       "github.com/apache/incubator-devlake/mocks"
+       "github.com/apache/incubator-devlake/models/common"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+)
+
+type MockJirIssueBsd struct {
+       common.RawDataOrigin
+       ID               string `gorm:"primaryKey"`
+       Title            string
+       ChangelogUpdated *time.Time
+}
+
+type MockJiraChangelogBsd struct {
+       common.RawDataOrigin
+       ID string `gorm:"primaryKey"`
+}
+
+type MockJiraIssueChangelogUpdatedBsd struct {
+       ID               string `gorm:"primaryKey"`
+       ChangelogUpdated *time.Time
+}
+
+func (MockJiraIssueChangelogUpdatedBsd) PartialUpdate() {}
+
+func TestBatchSaveDivider(t *testing.T) {
+       mockDal := new(mocks.Dal)
+
+       mockLog := unithelper.DummyLogger()
+       mockRes := new(mocks.BasicRes)
+       mockRes.On("GetDal").Return(mockDal)
+       mockRes.On("GetLogger").Return(mockLog)
+
+       divider := NewBatchSaveDivider(mockRes, 10, "", "")
+
+       // we expect total 2 deletion calls after all code got carried out
+       mockDal.On("Delete", mock.Anything, mock.Anything).Return(nil).Twice()
+
+       // for same type should return the same BatchSave
+       jiraIssue1, err := divider.ForType(reflect.TypeOf(&MockJirIssueBsd{}))
+       assert.Nil(t, err)
+
+       jiraIssue2, err := divider.ForType(reflect.TypeOf(&MockJirIssueBsd{}))
+       assert.Nil(t, err)
+       assert.Equal(t, jiraIssue1, jiraIssue2)
+
+       // for different types should return different BatchSaves
+       jiraChangelog1, err := 
divider.ForType(reflect.TypeOf(&MockJiraChangelogBsd{}))
+       assert.Nil(t, err)
+
+       jiraChangelog2, err := 
divider.ForType(reflect.TypeOf(&MockJiraChangelogBsd{}))
+       assert.Nil(t, err)
+       assert.Equal(t, jiraChangelog1, jiraChangelog2)
+
+       // assertion
+       mockDal.AssertExpectations(t)
+}
diff --git a/plugins/helper/batch_save_divider_test.go.old 
b/plugins/helper/batch_save_divider_test.go.old
deleted file mode 100644
index 42991a33..00000000
--- a/plugins/helper/batch_save_divider_test.go.old
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package helper
-
-import (
-       "github.com/apache/incubator-devlake/logger"
-       "reflect"
-       "testing"
-
-       "github.com/stretchr/testify/assert"
-       "gorm.io/gorm"
-)
-
-// go test -gcflags=all=-l
-
-var TestBatchSize int = 100
-
-func CreateTestBatchSaveDivider() *BatchSaveDivider {
-       return NewBatchSaveDivider(&gorm.DB{}, TestBatchSize)
-}
-
-func TestBatchSaveDivider(t *testing.T) {
-       MockDB(t)
-       defer UnMockDB()
-       batchSaveDivider := CreateTestBatchSaveDivider()
-       initTimes := 0
-
-       batchSaveDivider.OnNewBatchSave(func(rowType reflect.Type) error {
-               initTimes++
-               return nil
-       })
-
-       var err error
-       var b1 *BatchSave
-       var b2 *BatchSave
-       var b3 *BatchSave
-
-       // test if it saved and only saved once for one Type
-       b1, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData), 
logger.Global)
-       assert.Equal(t, initTimes, 1)
-       assert.Equal(t, err, nil)
-       b2, err = batchSaveDivider.ForType(reflect.TypeOf(&TestTable2{}), 
logger.Global)
-       assert.Equal(t, initTimes, 2)
-       assert.Equal(t, err, nil)
-       b3, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData), 
logger.Global)
-       assert.Equal(t, initTimes, 2)
-       assert.Equal(t, err, nil)
-
-       assert.NotEqual(t, b1, b2)
-       assert.Equal(t, b1, b3)
-}
diff --git a/plugins/helper/data_convertor.go b/plugins/helper/data_convertor.go
index 44a9affb..071d5f69 100644
--- a/plugins/helper/data_convertor.go
+++ b/plugins/helper/data_convertor.go
@@ -19,11 +19,8 @@ package helper
 
 import (
        "database/sql"
-       "fmt"
        "reflect"
 
-       "github.com/apache/incubator-devlake/models/common"
-
        "github.com/apache/incubator-devlake/plugins/core"
 )
 
@@ -69,25 +66,9 @@ func (converter *DataConverter) Execute() error {
        // load data from database
        db := converter.args.Ctx.GetDb()
 
-       // batch insertion divider
+       // batch save divider
        RAW_DATA_ORIGIN := "RawDataOrigin"
-       divider := NewBatchSaveDivider(db, converter.args.BatchSize)
-       divider.OnNewBatchSave(func(rowType reflect.Type) error {
-               // check if row type has RawDataOrigin
-               if rawDataOrigin, ok := 
rowType.Elem().FieldByName(RAW_DATA_ORIGIN); ok {
-                       if (rawDataOrigin.Type != 
reflect.TypeOf(common.RawDataOrigin{})) {
-                               return fmt.Errorf("type %s must nested 
RawDataOrigin struct", rowType.Name())
-                       }
-               } else {
-                       return fmt.Errorf("type %s must nested RawDataOrigin 
struct", rowType.Name())
-               }
-               // delete old data
-               return db.Delete(
-                       reflect.New(rowType).Interface(),
-                       "_raw_data_table = ? AND _raw_data_params = ?",
-                       converter.table, converter.params,
-               ).Error
-       })
+       divider := NewBatchSaveDivider(converter.args.Ctx, 
converter.args.BatchSize, converter.table, converter.params)
 
        // prgress
        converter.args.Ctx.SetProgress(0, -1)
@@ -115,13 +96,15 @@ func (converter *DataConverter) Execute() error {
 
                for _, result := range results {
                        // get the batch operator for the specific type
-                       batch, err := divider.ForType(reflect.TypeOf(result), 
converter.args.Ctx.GetLogger())
+                       batch, err := divider.ForType(reflect.TypeOf(result))
                        if err != nil {
                                return err
                        }
                        // set raw data origin field
-                       
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN).
-                               
Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
+                       origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+                       if origin.IsValid() {
+                               
origin.Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
+                       }
                        // records get saved into db when slots were max outed
                        err = batch.Add(result)
                        if err != nil {

Reply via email to