klesh commented on code in PR #4475:
URL:
https://github.com/apache/incubator-devlake/pull/4475#discussion_r1115177291
##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
return t.name
}
-func LoadData(c plugin.SubTaskContext) errors.Error {
- var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+ logger := c.GetLogger()
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- var err error
- if config.SourceType == "mysql" {
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else if config.SourceType == "postgres" {
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported
source type %s", config.SourceType))
- }
- db = dalgorm.NewDalgorm(o)
- sqlDB, err := o.DB()
- if err != nil {
- return errors.Convert(err)
- }
- defer sqlDB.Close()
- } else {
- db = c.GetDal()
+ // 1. Get db instance
+ db, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
}
- var starrocksTables []string
- if config.DomainLayer != "" {
- starrocksTables =
utils.GetTablesByDomainLayer(config.DomainLayer)
- if starrocksTables == nil {
- return errors.NotFound.New(fmt.Sprintf("no table found
by domain layer: %s", config.DomainLayer))
- }
- } else {
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
- if len(tables) == 0 {
- starrocksTables = allTables
- } else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err =
errors.Convert01(regexp.Match(r, []byte(table)))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables =
append(starrocksTables, table)
- }
- }
- }
- }
+ // 2. Filter out the tables to import
+ starrocksTables, err := getImportTables(c, db)
Review Comment:
import? I think it should be export?
##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
return t.name
}
-func LoadData(c plugin.SubTaskContext) errors.Error {
- var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+ logger := c.GetLogger()
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- var err error
- if config.SourceType == "mysql" {
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else if config.SourceType == "postgres" {
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported
source type %s", config.SourceType))
- }
- db = dalgorm.NewDalgorm(o)
- sqlDB, err := o.DB()
- if err != nil {
- return errors.Convert(err)
- }
- defer sqlDB.Close()
- } else {
- db = c.GetDal()
+ // 1. Get db instance
+ db, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
}
- var starrocksTables []string
- if config.DomainLayer != "" {
- starrocksTables =
utils.GetTablesByDomainLayer(config.DomainLayer)
- if starrocksTables == nil {
- return errors.NotFound.New(fmt.Sprintf("no table found
by domain layer: %s", config.DomainLayer))
- }
- } else {
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
- if len(tables) == 0 {
- starrocksTables = allTables
- } else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err =
errors.Convert01(regexp.Match(r, []byte(table)))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables =
append(starrocksTables, table)
- }
- }
- }
- }
+ // 2. Filter out the tables to import
+ starrocksTables, err := getImportTables(c, db)
Review Comment:
`getExportingTables` maybe
##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
return t.name
}
-func LoadData(c plugin.SubTaskContext) errors.Error {
- var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+ logger := c.GetLogger()
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- var err error
- if config.SourceType == "mysql" {
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else if config.SourceType == "postgres" {
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported
source type %s", config.SourceType))
- }
- db = dalgorm.NewDalgorm(o)
- sqlDB, err := o.DB()
- if err != nil {
- return errors.Convert(err)
- }
- defer sqlDB.Close()
- } else {
- db = c.GetDal()
+ // 1. Get db instance
+ db, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
}
- var starrocksTables []string
- if config.DomainLayer != "" {
- starrocksTables =
utils.GetTablesByDomainLayer(config.DomainLayer)
- if starrocksTables == nil {
- return errors.NotFound.New(fmt.Sprintf("no table found
by domain layer: %s", config.DomainLayer))
- }
- } else {
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
- if len(tables) == 0 {
- starrocksTables = allTables
- } else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err =
errors.Convert01(regexp.Match(r, []byte(table)))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables =
append(starrocksTables, table)
- }
- }
- }
- }
+ // 2. Filter out the tables to import
+ starrocksTables, err := getImportTables(c, db)
+ if err != nil {
+ return errors.Convert(err)
}
-
- starrocks, err := sql.Open("mysql",
fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database))
+ // 3. put devlake data to starrocks
+ sr, err :=
gorm.Open(mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database)))
if err != nil {
return errors.Convert(err)
}
- defer starrocks.Close()
+ starrocksDb := dalgorm.NewDalgorm(sr)
for _, table := range starrocksTables {
+ select {
+ case <-c.GetContext().Done():
+ return errors.Convert(c.GetContext().Err())
+ default:
+ }
starrocksTable := strings.TrimLeft(table, "_")
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
- var columnMap map[string]string
- var orderBy string
- var skip bool
- columnMap, orderBy, skip, err = createTmpTable(starrocks, db,
starrocksTable, starrocksTmpTable, table, c, config)
+ columnMap, orderBy, skip, err := createTmpTable(c, starrocksDb,
db, starrocksTable, starrocksTmpTable, table)
Review Comment:
How about `createTmpTableInStarrocks`?
##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
return t.name
}
-func LoadData(c plugin.SubTaskContext) errors.Error {
- var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+ logger := c.GetLogger()
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- var err error
- if config.SourceType == "mysql" {
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else if config.SourceType == "postgres" {
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported
source type %s", config.SourceType))
- }
- db = dalgorm.NewDalgorm(o)
- sqlDB, err := o.DB()
- if err != nil {
- return errors.Convert(err)
- }
- defer sqlDB.Close()
- } else {
- db = c.GetDal()
+ // 1. Get db instance
+ db, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
}
- var starrocksTables []string
- if config.DomainLayer != "" {
- starrocksTables =
utils.GetTablesByDomainLayer(config.DomainLayer)
- if starrocksTables == nil {
- return errors.NotFound.New(fmt.Sprintf("no table found
by domain layer: %s", config.DomainLayer))
- }
- } else {
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
- if len(tables) == 0 {
- starrocksTables = allTables
- } else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err =
errors.Convert01(regexp.Match(r, []byte(table)))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables =
append(starrocksTables, table)
- }
- }
- }
- }
+ // 2. Filter out the tables to import
+ starrocksTables, err := getImportTables(c, db)
+ if err != nil {
+ return errors.Convert(err)
}
-
- starrocks, err := sql.Open("mysql",
fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database))
+ // 3. put devlake data to starrocks
+ sr, err :=
gorm.Open(mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database)))
if err != nil {
return errors.Convert(err)
}
- defer starrocks.Close()
+ starrocksDb := dalgorm.NewDalgorm(sr)
for _, table := range starrocksTables {
+ select {
+ case <-c.GetContext().Done():
+ return errors.Convert(c.GetContext().Err())
+ default:
+ }
starrocksTable := strings.TrimLeft(table, "_")
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
- var columnMap map[string]string
- var orderBy string
- var skip bool
- columnMap, orderBy, skip, err = createTmpTable(starrocks, db,
starrocksTable, starrocksTmpTable, table, c, config)
+ columnMap, orderBy, skip, err := createTmpTable(c, starrocksDb,
db, starrocksTable, starrocksTmpTable, table)
if skip {
- c.GetLogger().Info(fmt.Sprintf("table %s is up to date,
so skip it", table))
+ logger.Info(fmt.Sprintf("table %s is up to date, so
skip it", table))
continue
}
if err != nil {
- c.GetLogger().Error(err, "create table %s in starrocks
error", table)
+ logger.Error(err, "create table %s in starrocks error",
table)
return errors.Convert(err)
}
- if db.Dialect() == "postgres" {
- err = db.Exec("begin transaction isolation level
repeatable read")
- if err != nil {
- return errors.Convert(err)
- }
- } else if db.Dialect() == "mysql" {
- err = db.Exec("set session transaction isolation level
repeatable read")
- if err != nil {
- return errors.Convert(err)
- }
- err = errors.Convert(db.Exec("start transaction"))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported
dialect %s", db.Dialect()))
- }
- err = errors.Convert(loadData(starrocks, c, starrocksTable,
starrocksTmpTable, table, columnMap, db, config, orderBy))
- if err != nil {
- return errors.Convert(err)
- }
- err = errors.Convert(db.Exec("commit"))
+ err = putDataToDst(c, starrocksDb, db, starrocksTable,
starrocksTmpTable, table, columnMap, orderBy)
Review Comment:
`copyDataToDst`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]