This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new af3b9b7e Refactor helper to replace `GetDb` with `GetDal` (#2184)
af3b9b7e is described below
commit af3b9b7e2a49b528b23854db630e98e92cb99ce1
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Jun 14 16:24:00 2022 +0800
Refactor helper to replace `GetDb` with `GetDal` (#2184)
* fix: ignore '.so' files when loading plugins
* fix: missing mock for method `Nested`
* fix: worker errors was not caught
* fix: worker_scheduler wouldn't halt on error
* refactor: replace `GetDb` with `GetDal`
Closes #2183
* fix: unit test
---
impl/dalgorm/dalgorm.go | 7 ++
plugins/core/dal/dal.go | 2 +
plugins/gitextractor/main.go | 3 +-
plugins/gitextractor/store/database.go | 25 ++++---
plugins/gitextractor/tasks/git_repo_collector.go | 3 +-
plugins/helper/api_extractor.go | 40 ++++--------
plugins/helper/batch_save.go | 38 ++++++-----
plugins/helper/batch_save_divider.go | 70 ++++++++++++--------
plugins/helper/batch_save_divider_test.go | 83 ++++++++++++++++++++++++
plugins/helper/batch_save_divider_test.go.old | 66 -------------------
plugins/helper/data_convertor.go | 31 ++-------
11 files changed, 196 insertions(+), 172 deletions(-)
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 5a340b02..5fc897d0 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -107,6 +107,13 @@ func (d *Dalgorm) First(dst interface{}, clauses
...dal.Clause) error {
return buildTx(d.db, clauses).First(dst).Error
}
+// Count total records
+func (d *Dalgorm) Count(clauses ...dal.Clause) (int64, error) {
+ var count int64
+ err := buildTx(d.db, clauses).Count(&count).Error
+ return count, err
+}
+
// Pluck used to query single column
func (d *Dalgorm) Pluck(column string, dest interface{}, clauses
...dal.Clause) error {
return buildTx(d.db, clauses).Pluck(column, dest).Error
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index daf43b4a..5f97df5a 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -44,6 +44,8 @@ type Dal interface {
All(dst interface{}, clauses ...Clause) error
// First loads first matched row from database to `dst`, error will be
returned if no records were found
First(dst interface{}, clauses ...Clause) error
+ // All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+ Count(clauses ...Clause) (int64, error)
// Pluck used to query single column
Pluck(column string, dest interface{}, clauses ...Clause) error
// Create insert record to database
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index 300ae5b0..337b274e 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -61,7 +61,8 @@ func main() {
if err != nil {
panic(err)
}
- storage = store.NewDatabase(database, log)
+ basicRes := helper.NewDefaultBasicRes(nil, log, database)
+ storage = store.NewDatabase(basicRes, *url)
} else {
panic("either specify `-output` or `-db` argument as
destination")
}
diff --git a/plugins/gitextractor/store/database.go
b/plugins/gitextractor/store/database.go
index 4e264fc6..68be40b5 100644
--- a/plugins/gitextractor/store/database.go
+++ b/plugins/gitextractor/store/database.go
@@ -18,11 +18,12 @@ limitations under the License.
package store
import (
+ "fmt"
+ "reflect"
+
"github.com/apache/incubator-devlake/models/domainlayer/code"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
- "gorm.io/gorm"
- "reflect"
)
const BathSize = 100
@@ -33,15 +34,19 @@ type Database struct {
log core.Logger
}
-func NewDatabase(db *gorm.DB, log core.Logger) *Database {
+func NewDatabase(basicRes core.BasicRes, repoUrl string) *Database {
database := new(Database)
- database.driver = helper.NewBatchSaveDivider(db, BathSize)
- database.log = log
+ database.driver = helper.NewBatchSaveDivider(
+ basicRes,
+ BathSize,
+ "gitextractor",
+ fmt.Sprintf(`{"RepoUrl": "%s"}`, repoUrl),
+ )
return database
}
func (d *Database) RepoCommits(repoCommit *code.RepoCommit) error {
- batch, err := d.driver.ForType(reflect.TypeOf(repoCommit), d.log)
+ batch, err := d.driver.ForType(reflect.TypeOf(repoCommit))
if err != nil {
return err
}
@@ -49,7 +54,7 @@ func (d *Database) RepoCommits(repoCommit *code.RepoCommit)
error {
}
func (d *Database) Commits(commit *code.Commit) error {
- batch, err := d.driver.ForType(reflect.TypeOf(commit), d.log)
+ batch, err := d.driver.ForType(reflect.TypeOf(commit))
if err != nil {
return err
}
@@ -57,7 +62,7 @@ func (d *Database) Commits(commit *code.Commit) error {
}
func (d *Database) Refs(ref *code.Ref) error {
- batch, err := d.driver.ForType(reflect.TypeOf(ref), d.log)
+ batch, err := d.driver.ForType(reflect.TypeOf(ref))
if err != nil {
return err
}
@@ -65,7 +70,7 @@ func (d *Database) Refs(ref *code.Ref) error {
}
func (d *Database) CommitFiles(file *code.CommitFile) error {
- batch, err := d.driver.ForType(reflect.TypeOf(file), d.log)
+ batch, err := d.driver.ForType(reflect.TypeOf(file))
if err != nil {
return err
}
@@ -76,7 +81,7 @@ func (d *Database) CommitParents(pp []*code.CommitParent)
error {
if len(pp) == 0 {
return nil
}
- batch, err := d.driver.ForType(reflect.TypeOf(pp[0]), d.log)
+ batch, err := d.driver.ForType(reflect.TypeOf(pp[0]))
if err != nil {
return err
}
diff --git a/plugins/gitextractor/tasks/git_repo_collector.go
b/plugins/gitextractor/tasks/git_repo_collector.go
index 4dcb8cd2..c5b86bd6 100644
--- a/plugins/gitextractor/tasks/git_repo_collector.go
+++ b/plugins/gitextractor/tasks/git_repo_collector.go
@@ -55,9 +55,8 @@ func (o GitExtractorOptions) Valid() error {
func CollectGitRepo(subTaskCtx core.SubTaskContext) error {
var err error
- db := subTaskCtx.GetDb()
- storage := store.NewDatabase(db, subTaskCtx.GetLogger())
op := subTaskCtx.GetData().(GitExtractorOptions)
+ storage := store.NewDatabase(subTaskCtx, op.Url)
p := parser.NewLibGit2(storage, subTaskCtx)
if strings.HasPrefix(op.Url, "http") {
err = p.CloneOverHTTP(op.RepoId, op.Url, op.User, op.Password,
op.Proxy)
diff --git a/plugins/helper/api_extractor.go b/plugins/helper/api_extractor.go
index fc09c27e..25fe4b95 100644
--- a/plugins/helper/api_extractor.go
+++ b/plugins/helper/api_extractor.go
@@ -18,7 +18,6 @@ limitations under the License.
package helper
import (
- "fmt"
"reflect"
"github.com/apache/incubator-devlake/models/common"
@@ -37,12 +36,13 @@ type ApiExtractorArgs struct {
// It reads rows from specified raw data table, and feed it into `Extract`
handler
// you can return arbitrary tool layer entities in this handler, ApiExtractor
would
// first delete old data by their RawDataOrigin information, and then perform a
-// batch insertion for you.
+// batch save for you.
type ApiExtractor struct {
*RawDataSubTask
args *ApiExtractorArgs
}
+// NewApiExtractor creates a new ApiExtractor
func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error) {
// process args
rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
@@ -58,6 +58,7 @@ func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor,
error) {
}, nil
}
+// Execute sub-task
func (extractor *ApiExtractor) Execute() error {
// load data from database
db := extractor.args.Ctx.GetDb()
@@ -76,25 +77,9 @@ func (extractor *ApiExtractor) Execute() error {
defer cursor.Close()
row := &RawData{}
- // batch insertion divider
+ // batch save divider
RAW_DATA_ORIGIN := "RawDataOrigin"
- divider := NewBatchSaveDivider(db, extractor.args.BatchSize)
- divider.OnNewBatchSave(func(rowType reflect.Type) error {
- // check if row type has RawDataOrigin
- if rawDataOrigin, ok :=
rowType.Elem().FieldByName(RAW_DATA_ORIGIN); ok {
- if (rawDataOrigin.Type !=
reflect.TypeOf(common.RawDataOrigin{})) {
- return fmt.Errorf("type %s must nested
RawDataOrigin struct", rowType.Name())
- }
- } else {
- return fmt.Errorf("type %s must nested RawDataOrigin
struct", rowType.Name())
- }
- // delete old data
- return db.Delete(
- reflect.New(rowType).Interface(),
- "_raw_data_table = ? AND _raw_data_params = ?",
- extractor.table, extractor.params,
- ).Error
- })
+ divider := NewBatchSaveDivider(extractor.args.Ctx,
extractor.args.BatchSize, extractor.table, extractor.params)
// prgress
extractor.args.Ctx.SetProgress(0, -1)
@@ -118,16 +103,19 @@ func (extractor *ApiExtractor) Execute() error {
for _, result := range results {
// get the batch operator for the specific type
- batch, err := divider.ForType(reflect.TypeOf(result),
log)
+ batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
return err
}
// set raw data origin field
-
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN).Set(reflect.ValueOf(common.RawDataOrigin{
- RawDataTable: extractor.table,
- RawDataId: row.ID,
- RawDataParams: row.Params,
- }))
+ origin :=
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+ if origin.IsValid() {
+ origin.Set(reflect.ValueOf(common.RawDataOrigin{
+ RawDataTable: extractor.table,
+ RawDataId: row.ID,
+ RawDataParams: row.Params,
+ }))
+ }
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index a5d66a7f..28643fee 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -19,47 +19,50 @@ package helper
import (
"fmt"
- "github.com/apache/incubator-devlake/plugins/core"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
"reflect"
"strings"
+
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
-// Insert data by batch can increase database performance drastically, this
class aim to make batch-save easier,
-// It takes care the database operation for specified `slotType`, records got
saved into database whenever cache hits
-// The `size` limit, remember to call the `Close` method to save the last batch
+// BatchSave performs mulitple records persistence of a specific type in one
sql query to improve the performance
type BatchSave struct {
+ basicRes core.BasicRes
+ log core.Logger
+ db dal.Dal
slotType reflect.Type
// slots can not be []interface{}, because gorm wouldn't take it
// I'm guessing the reason is the type information lost when converted
to interface{}
slots reflect.Value
- db *gorm.DB
- logger core.Logger
current int
size int
valueIndex map[string]int
}
-func NewBatchSave(db *gorm.DB, log core.Logger, slotType reflect.Type, size
int) (*BatchSave, error) {
+const BATCH_SAVE_UPDATE_ONLY = 0
+
+// NewBatchSave creates a new BatchSave instance
+func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int)
(*BatchSave, error) {
if slotType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("slotType must be a pointer")
}
if !hasPrimaryKey(slotType) {
return nil, fmt.Errorf("%s no primary key", slotType.String())
}
- log = log.Nested(slotType.String())
- log.Info("create batch save success")
+ log := basicRes.GetLogger().Nested(slotType.String())
return &BatchSave{
+ basicRes: basicRes,
+ log: log,
+ db: basicRes.GetDal(),
slotType: slotType,
- logger: log,
slots: reflect.MakeSlice(reflect.SliceOf(slotType), size,
size),
- db: db,
size: size,
valueIndex: make(map[string]int),
}, nil
}
+// Add record to cache. BatchSave would flush them into Database when cache is
max out
func (c *BatchSave) Add(slot interface{}) error {
// type checking
if reflect.TypeOf(slot) != c.slotType {
@@ -85,23 +88,24 @@ func (c *BatchSave) Add(slot interface{}) error {
if c.current == c.size {
return c.Flush()
} else if c.current%100 == 0 {
- c.logger.Debug("batch save current: %d", c.current)
+ c.log.Debug("batch save current: %d", c.current)
}
return nil
}
+// Flush save cached records into database
func (c *BatchSave) Flush() error {
- result := c.db.Clauses(clause.OnConflict{UpdateAll:
true}).Create(c.slots.Slice(0, c.current).Interface())
- err := result.Error
+ err := c.db.CreateOrUpdate(c.slots.Slice(0, c.current).Interface())
if err != nil {
return err
}
- c.logger.Info("batch save flush %d and %d success", c.slots.Slice(0,
c.current).Len(), result.RowsAffected)
+ c.log.Debug("batch save flush total %d records to database", c.current)
c.current = 0
c.valueIndex = make(map[string]int)
return nil
}
+// Close would flash the cache and release resources
func (c *BatchSave) Close() error {
if c.current > 0 {
return c.Flush()
diff --git a/plugins/helper/batch_save_divider.go
b/plugins/helper/batch_save_divider.go
index 63003e98..6f1cbffb 100644
--- a/plugins/helper/batch_save_divider.go
+++ b/plugins/helper/batch_save_divider.go
@@ -18,58 +18,76 @@ limitations under the License.
package helper
import (
- "github.com/apache/incubator-devlake/plugins/core"
+ "fmt"
"reflect"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/models/common"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
-type OnNewBatchSave func(rowType reflect.Type) error
-
-// Holds a map of BatchInsert, return `*BatchInsert` for a specific records,
so caller can do batch operation for it
+// BatchSaveDivider creates and caches BatchSave, this is helpful when dealing
with massive amount of data records
+// with arbitrary types.
type BatchSaveDivider struct {
- db *gorm.DB
- batches map[reflect.Type]*BatchSave
- batchSize int
- onNewBatchSave OnNewBatchSave
+ basicRes core.BasicRes
+ log core.Logger
+ db dal.Dal
+ batches map[reflect.Type]*BatchSave
+ batchSize int
+ table string
+ params string
}
-// Return a new BatchInsertDivider instance
-func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider {
+// NewBatchSaveDivider create a new BatchInsertDivider instance
+func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string,
params string) *BatchSaveDivider {
+ log := basicRes.GetLogger().Nested("batch divider")
return &BatchSaveDivider{
- db: db,
+ basicRes: basicRes,
+ log: log,
+ db: basicRes.GetDal(),
batches: make(map[reflect.Type]*BatchSave),
batchSize: batchSize,
+ table: table,
+ params: params,
}
}
-func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave) {
- d.onNewBatchSave = cb
-}
-
-// return *BatchSave for specified type
-func (d *BatchSaveDivider) ForType(rowType reflect.Type, log core.Logger)
(*BatchSave, error) {
+// ForType returns a `BatchSave` instance for specific type
+func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error) {
// get the cache for the specific type
batch := d.batches[rowType]
var err error
// create one if not exists
if batch == nil {
- batch, err = NewBatchSave(d.db, log, rowType, d.batchSize)
+ batch, err = NewBatchSave(d.basicRes, rowType, d.batchSize)
if err != nil {
return nil, err
}
- if d.onNewBatchSave != nil {
- err = d.onNewBatchSave(rowType)
- if err != nil {
- return nil, err
- }
- }
d.batches[rowType] = batch
+ // delete outdated records if rowType was not PartialUpdate
+ rowElemType := rowType.Elem()
+ d.log.Debug("missing BatchSave for type %s", rowElemType.Name())
+ row := reflect.New(rowElemType).Interface()
+ // check if rowType had RawDataOrigin embeded
+ field, hasField := rowElemType.FieldByName("RawDataOrigin")
+ if !hasField || field.Type !=
reflect.TypeOf(common.RawDataOrigin{}) {
+ return nil, fmt.Errorf("type %s must have RawDataOrigin
embeded", rowElemType.Name())
+ }
+ // all good, delete outdated records before we insertion
+ d.log.Debug("deleting outdate records for %s",
rowElemType.Name())
+ d.db.Delete(
+ row,
+ dal.Where("_raw_data_table = ? AND _raw_data_params =
?", d.table, d.params),
+ )
}
return batch, nil
}
-// close all batches so all rest records get saved into db as well
+func (d *BatchSaveDivider) flushBatch() {
+
+}
+
+// Close all batches so the rest records get saved into db
func (d *BatchSaveDivider) Close() error {
for _, batch := range d.batches {
err := batch.Close()
diff --git a/plugins/helper/batch_save_divider_test.go
b/plugins/helper/batch_save_divider_test.go
new file mode 100644
index 00000000..ae933a05
--- /dev/null
+++ b/plugins/helper/batch_save_divider_test.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/apache/incubator-devlake/helpers/unithelper"
+ "github.com/apache/incubator-devlake/mocks"
+ "github.com/apache/incubator-devlake/models/common"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+type MockJirIssueBsd struct {
+ common.RawDataOrigin
+ ID string `gorm:"primaryKey"`
+ Title string
+ ChangelogUpdated *time.Time
+}
+
+type MockJiraChangelogBsd struct {
+ common.RawDataOrigin
+ ID string `gorm:"primaryKey"`
+}
+
+type MockJiraIssueChangelogUpdatedBsd struct {
+ ID string `gorm:"primaryKey"`
+ ChangelogUpdated *time.Time
+}
+
+func (MockJiraIssueChangelogUpdatedBsd) PartialUpdate() {}
+
+func TestBatchSaveDivider(t *testing.T) {
+ mockDal := new(mocks.Dal)
+
+ mockLog := unithelper.DummyLogger()
+ mockRes := new(mocks.BasicRes)
+ mockRes.On("GetDal").Return(mockDal)
+ mockRes.On("GetLogger").Return(mockLog)
+
+ divider := NewBatchSaveDivider(mockRes, 10, "", "")
+
+ // we expect total 2 deletion calls after all code got carried out
+ mockDal.On("Delete", mock.Anything, mock.Anything).Return(nil).Twice()
+
+ // for same type should return the same BatchSave
+ jiraIssue1, err := divider.ForType(reflect.TypeOf(&MockJirIssueBsd{}))
+ assert.Nil(t, err)
+
+ jiraIssue2, err := divider.ForType(reflect.TypeOf(&MockJirIssueBsd{}))
+ assert.Nil(t, err)
+ assert.Equal(t, jiraIssue1, jiraIssue2)
+
+ // for different types should return different BatchSaves
+ jiraChangelog1, err :=
divider.ForType(reflect.TypeOf(&MockJiraChangelogBsd{}))
+ assert.Nil(t, err)
+
+ jiraChangelog2, err :=
divider.ForType(reflect.TypeOf(&MockJiraChangelogBsd{}))
+ assert.Nil(t, err)
+ assert.Equal(t, jiraChangelog1, jiraChangelog2)
+
+ // assertion
+ mockDal.AssertExpectations(t)
+}
diff --git a/plugins/helper/batch_save_divider_test.go.old
b/plugins/helper/batch_save_divider_test.go.old
deleted file mode 100644
index 42991a33..00000000
--- a/plugins/helper/batch_save_divider_test.go.old
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package helper
-
-import (
- "github.com/apache/incubator-devlake/logger"
- "reflect"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "gorm.io/gorm"
-)
-
-// go test -gcflags=all=-l
-
-var TestBatchSize int = 100
-
-func CreateTestBatchSaveDivider() *BatchSaveDivider {
- return NewBatchSaveDivider(&gorm.DB{}, TestBatchSize)
-}
-
-func TestBatchSaveDivider(t *testing.T) {
- MockDB(t)
- defer UnMockDB()
- batchSaveDivider := CreateTestBatchSaveDivider()
- initTimes := 0
-
- batchSaveDivider.OnNewBatchSave(func(rowType reflect.Type) error {
- initTimes++
- return nil
- })
-
- var err error
- var b1 *BatchSave
- var b2 *BatchSave
- var b3 *BatchSave
-
- // test if it saved and only saved once for one Type
- b1, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData),
logger.Global)
- assert.Equal(t, initTimes, 1)
- assert.Equal(t, err, nil)
- b2, err = batchSaveDivider.ForType(reflect.TypeOf(&TestTable2{}),
logger.Global)
- assert.Equal(t, initTimes, 2)
- assert.Equal(t, err, nil)
- b3, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData),
logger.Global)
- assert.Equal(t, initTimes, 2)
- assert.Equal(t, err, nil)
-
- assert.NotEqual(t, b1, b2)
- assert.Equal(t, b1, b3)
-}
diff --git a/plugins/helper/data_convertor.go b/plugins/helper/data_convertor.go
index 44a9affb..071d5f69 100644
--- a/plugins/helper/data_convertor.go
+++ b/plugins/helper/data_convertor.go
@@ -19,11 +19,8 @@ package helper
import (
"database/sql"
- "fmt"
"reflect"
- "github.com/apache/incubator-devlake/models/common"
-
"github.com/apache/incubator-devlake/plugins/core"
)
@@ -69,25 +66,9 @@ func (converter *DataConverter) Execute() error {
// load data from database
db := converter.args.Ctx.GetDb()
- // batch insertion divider
+ // batch save divider
RAW_DATA_ORIGIN := "RawDataOrigin"
- divider := NewBatchSaveDivider(db, converter.args.BatchSize)
- divider.OnNewBatchSave(func(rowType reflect.Type) error {
- // check if row type has RawDataOrigin
- if rawDataOrigin, ok :=
rowType.Elem().FieldByName(RAW_DATA_ORIGIN); ok {
- if (rawDataOrigin.Type !=
reflect.TypeOf(common.RawDataOrigin{})) {
- return fmt.Errorf("type %s must nested
RawDataOrigin struct", rowType.Name())
- }
- } else {
- return fmt.Errorf("type %s must nested RawDataOrigin
struct", rowType.Name())
- }
- // delete old data
- return db.Delete(
- reflect.New(rowType).Interface(),
- "_raw_data_table = ? AND _raw_data_params = ?",
- converter.table, converter.params,
- ).Error
- })
+ divider := NewBatchSaveDivider(converter.args.Ctx,
converter.args.BatchSize, converter.table, converter.params)
// prgress
converter.args.Ctx.SetProgress(0, -1)
@@ -115,13 +96,15 @@ func (converter *DataConverter) Execute() error {
for _, result := range results {
// get the batch operator for the specific type
- batch, err := divider.ForType(reflect.TypeOf(result),
converter.args.Ctx.GetLogger())
+ batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
return err
}
// set raw data origin field
-
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN).
-
Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
+ origin :=
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+ if origin.IsValid() {
+
origin.Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
+ }
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {