This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new b2b9e053 feat: improve starrocks plugin (#2415)
b2b9e053 is described below

commit b2b9e0537334ee674f466837e49f220f349a7f92
Author: long2ice <[email protected]>
AuthorDate: Wed Jul 6 00:00:02 2022 +0800

    feat: improve starrocks plugin (#2415)
    
    * feat: improve starrocks plugin
    
    * ci: fix lint
---
 .../src/data/pipeline-config-samples/starrocks.js  |   4 +-
 impl/dalgorm/dalgorm.go                            |  37 ++++
 plugins/core/dal/dal.go                            |   2 +
 plugins/starrocks/starrocks.go                     |  20 +-
 plugins/starrocks/task_data.go                     |  16 +-
 plugins/starrocks/tasks.go                         | 213 +++++++++++++--------
 plugins/starrocks/{task_data.go => utils.go}       |  24 ++-
 7 files changed, 216 insertions(+), 100 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js 
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index 74f4bcd2..8e93e87b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -26,7 +26,9 @@ const starRocksConfig = [
         password: '',
         database: 'lake',
         be_port: 8040,
-        tables: ['_tool_github_commits']
+        tables: ['_tool_.*'], // support regexp
+        batch_size: 10000,
+        extra: '' // will append to create table sql
       }
     }
   ]
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index eb68c0a4..773c2e06 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -19,6 +19,7 @@ package dalgorm
 
 import (
        "database/sql"
+       "fmt"
        "strings"
 
        "github.com/apache/incubator-devlake/plugins/core/dal"
@@ -171,6 +172,42 @@ func (d *Dalgorm) AllTables() ([]string, error) {
        }
        return filteredTables, nil
 }
+
+// GetTableColumns returns table columns in database
+func (d *Dalgorm) GetTableColumns(table string) (map[string]string, error) {
+       var columnSql string
+       ret := make(map[string]string)
+       if d.db.Dialector.Name() == "mysql" {
+               type MySQLColumn struct {
+                       Field string
+                       Type  string
+               }
+               var result []MySQLColumn
+               columnSql = fmt.Sprintf("show columns from %s", table)
+               err := d.db.Raw(columnSql).Scan(&result).Error
+               if err != nil {
+                       return nil, err
+               }
+               for _, item := range result {
+                       ret[item.Field] = item.Type
+               }
+       } else {
+               columnSql = fmt.Sprintf("select column_name,data_type from 
information_schema.COLUMNS where TABLE_NAME='%s' and TABLE_SCHEMA='public'", 
table)
+               type PostgresColumn struct {
+                       ColumnName string `gorm:"column_name"`
+                       DataType   string `gorm:"data_type"`
+               }
+               var result []PostgresColumn
+               err := d.db.Raw(columnSql).Scan(&result).Error
+               if err != nil {
+                       return nil, err
+               }
+               for _, item := range result {
+                       ret[item.ColumnName] = item.DataType
+               }
+       }
+       return ret, nil
+}
 func NewDalgorm(db *gorm.DB) *Dalgorm {
        return &Dalgorm{db}
 }
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 4b297f59..85d74c3b 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -58,6 +58,8 @@ type Dal interface {
        Delete(entity interface{}, clauses ...Clause) error
        // AllTables returns all tables in database
        AllTables() ([]string, error)
+       // GetTableColumns returns table columns in database
+       GetTableColumns(table string) (map[string]string, error)
 }
 
 type DalClause struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index c4e5bb30..99f062b7 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -66,15 +66,21 @@ func main() {
        database := cmd.Flags().StringP("database", "d", "", "StarRocks 
database")
        _ = cmd.MarkFlagRequired("table")
        tables := cmd.Flags().StringArrayP("table", "t", []string{}, "StarRocks 
table")
+       _ = cmd.MarkFlagRequired("batch_size")
+       batchSize := cmd.Flags().StringP("batch_size", "b", "", "StarRocks 
insert batch size")
+       _ = cmd.MarkFlagRequired("batch_size")
+       extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table 
sql extra")
        cmd.Run = func(cmd *cobra.Command, args []string) {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
-                       "host":     host,
-                       "port":     port,
-                       "user":     user,
-                       "password": password,
-                       "database": database,
-                       "be_port":  bePort,
-                       "tables":   tables,
+                       "host":       host,
+                       "port":       port,
+                       "user":       user,
+                       "password":   password,
+                       "database":   database,
+                       "be_port":    bePort,
+                       "tables":     tables,
+                       "batch_size": batchSize,
+                       "extra":      extra,
                })
        }
        runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index ceb86c00..aaf907a7 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,11 +17,13 @@ limitations under the License.
 package main
 
 type StarRocksConfig struct {
-       Host     string
-       Port     int
-       User     string
-       Password string
-       Database string
-       BePort   int `mapstructure:"be_port"`
-       Tables   []string
+       Host      string
+       Port      int
+       User      string
+       Password  string
+       Database  string
+       BePort    int `mapstructure:"be_port"`
+       Tables    []string
+       BatchSize int `mapstructure:"batch_size"`
+       Extra     string
 }
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index a63a5de5..b14598b1 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -23,6 +23,7 @@ import (
        "fmt"
        "io/ioutil"
        "net/http"
+       "regexp"
        "strings"
 
        "github.com/apache/incubator-devlake/plugins/core"
@@ -33,11 +34,25 @@ func LoadData(c core.SubTaskContext) error {
        config := c.GetData().(*StarRocksConfig)
        db := c.GetDal()
        tables := config.Tables
-       var err error
+       allTables, err := db.AllTables()
+       if err != nil {
+               return err
+       }
+       var starrocksTables []string
        if len(tables) == 0 {
-               tables, err = db.AllTables()
-               if err != nil {
-                       return err
+               starrocksTables = allTables
+       } else {
+               for _, table := range allTables {
+                       for _, r := range tables {
+                               var ok bool
+                               ok, err = regexp.Match(r, []byte(table))
+                               if err != nil {
+                                       return err
+                               }
+                               if ok {
+                                       starrocksTables = 
append(starrocksTables, table)
+                               }
+                       }
                }
        }
        starrocks, err := sql.Open("mysql", 
fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", 
config.User, config.Password, config.Host, config.Port, config.Database))
@@ -45,100 +60,144 @@ func LoadData(c core.SubTaskContext) error {
                return err
        }
 
-       for _, table := range tables {
-               err = loadData(starrocks, c, table, db, config)
+       for _, table := range starrocksTables {
+               starrocksTable := strings.TrimLeft(table, "_")
+               err = createTable(starrocks, db, starrocksTable, table, c, 
config.Extra)
+               if err != nil {
+                       c.GetLogger().Error("create table %s in starrocks 
error: %s", table, err)
+                       return err
+               }
+               err = loadData(starrocks, c, starrocksTable, table, db, config)
                if err != nil {
                        c.GetLogger().Error("load data %s error: %s", table, 
err)
+                       return err
                }
        }
        return nil
 }
-func loadData(starrocks *sql.DB, c core.SubTaskContext, table string, db 
dal.Dal, config *StarRocksConfig) error {
-       var data []map[string]interface{}
-       // select data from db
-       rows, err := db.RawCursor(fmt.Sprintf("select * from %s", table))
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) error {
+       columnMap, err := db.GetTableColumns(table)
        if err != nil {
                return err
        }
-       cols, err := rows.Columns()
-       if err != nil {
-               return err
+       var pk string
+       if _, ok := columnMap["id"]; ok {
+               pk = "id"
+       } else {
+               for k := range columnMap {
+                       pk = k
+                       break
+               }
+       }
+       var columns []string
+       for field, dataType := range columnMap {
+               starrocksDatatype := getDataType(dataType)
+               column := fmt.Sprintf("%s %s", field, starrocksDatatype)
+               columns = append(columns, column)
+       }
+       if extra == "" {
+               extra = fmt.Sprintf(`engine=olap distributed by hash(%s) 
properties("replication_num" = "1")`, pk)
        }
-       for rows.Next() {
-               row := make(map[string]interface{})
-               columns := make([]string, len(cols))
-               columnPointers := make([]interface{}, len(cols))
-               for i := range columns {
-                       columnPointers[i] = &columns[i]
-               }
-               err = rows.Scan(columnPointers...)
+       tableSql := fmt.Sprintf(`create table if not exists %s ( %s ) %s`, 
starrocksTable, strings.Join(columns, ","), extra)
+       c.GetLogger().Info(tableSql)
+       _, err = starrocks.Exec(tableSql)
+       return err
+}
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, db dal.Dal, config *StarRocksConfig) error {
+       offset := 0
+       starrocksTmpTable := starrocksTable + "_tmp"
+       // create tmp table in starrocks
+       _, execErr := starrocks.Exec(fmt.Sprintf("create table %s like %s", 
starrocksTmpTable, starrocksTable))
+       if execErr != nil {
+               return execErr
+       }
+       for {
+               var data []map[string]interface{}
+               // select data from db
+               rows, err := db.RawCursor(fmt.Sprintf("select * from %s limit 
%d offset %d", table, config.BatchSize, offset))
                if err != nil {
                        return err
                }
-               for i, colName := range cols {
-                       row[colName] = columns[i]
+               cols, err := rows.Columns()
+               if err != nil {
+                       return err
                }
-               data = append(data, row)
-       }
-       if len(data) == 0 {
-               c.GetLogger().Warn("table %s is empty, so skip", table)
-               return nil
-       }
-       starrocksTable := strings.TrimLeft(table, "_")
-       // create tmp table in starrocks
-       _, err = starrocks.Exec(fmt.Sprintf("create table %s_tmp like %s", 
starrocksTable, starrocksTable))
-       if err != nil {
-               return err
-       }
-       // insert data to tmp table
-       url := fmt.Sprintf("http://%s:%d/api/%s/%s_tmp/_stream_load";, 
config.Host, config.BePort, config.Database, starrocksTable)
-       headers := map[string]string{
-               "format":            "json",
-               "strip_outer_array": "true",
-               "Expect":            "100-continue",
-               "ignore_json_size":  "true",
-       }
-       // marshal User to json
-       jsonData, err := json.Marshal(data)
-       if err != nil {
-               panic(err)
-       }
-       client := http.Client{}
-       req, err := http.NewRequest(http.MethodPut, url, 
bytes.NewBuffer(jsonData))
-       if err != nil {
-               panic(err)
-       }
-       req.SetBasicAuth(config.User, config.Password)
-       for k, v := range headers {
-               req.Header.Set(k, v)
-       }
-       resp, err := client.Do(req)
-       if err != nil {
-               return err
+               for rows.Next() {
+                       row := make(map[string]interface{})
+                       columns := make([]string, len(cols))
+                       columnPointers := make([]interface{}, len(cols))
+                       for i := range columns {
+                               columnPointers[i] = &columns[i]
+                       }
+                       err = rows.Scan(columnPointers...)
+                       if err != nil {
+                               return err
+                       }
+                       for i, colName := range cols {
+                               row[colName] = columns[i]
+                       }
+                       data = append(data, row)
+               }
+               if len(data) == 0 {
+                       c.GetLogger().Warn("no data found in table %s already, 
limit: %d, offset: %d, so break", table, config.BatchSize, offset)
+                       break
+               }
+               // insert data to tmp table
+               url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load";, 
config.Host, config.BePort, config.Database, starrocksTmpTable)
+               headers := map[string]string{
+                       "format":            "json",
+                       "strip_outer_array": "true",
+                       "Expect":            "100-continue",
+                       "ignore_json_size":  "true",
+               }
+               jsonData, err := json.Marshal(data)
+               if err != nil {
+                       return err
+               }
+               client := http.Client{}
+               req, err := http.NewRequest(http.MethodPut, url, 
bytes.NewBuffer(jsonData))
+               if err != nil {
+                       return err
+               }
+               req.SetBasicAuth(config.User, config.Password)
+               for k, v := range headers {
+                       req.Header.Set(k, v)
+               }
+               resp, err := client.Do(req)
+               if err != nil {
+                       return err
+               }
+               b, err := ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       return err
+               }
+               var result map[string]interface{}
+               err = json.Unmarshal(b, &result)
+               if err != nil {
+                       return err
+               }
+               if resp.StatusCode != http.StatusOK {
+                       c.GetLogger().Error("%s %s", resp.StatusCode, b)
+               }
+               if result["Status"] != "Success" {
+                       c.GetLogger().Error("load %s failed: %s", table, b)
+               } else {
+                       c.GetLogger().Info("load %s success: %s, limit: %d, 
offset: %d", table, b, config.BatchSize, offset)
+               }
+               offset += len(data)
        }
-       b, err := ioutil.ReadAll(resp.Body)
+       // drop old table
+       _, err := starrocks.Exec(fmt.Sprintf("drop table if exists %s", 
starrocksTable))
        if err != nil {
                return err
        }
-       var result map[string]interface{}
-       err = json.Unmarshal(b, &result)
+       // rename tmp table to old table
+       _, err = starrocks.Exec(fmt.Sprintf("alter table %s rename %s", 
starrocksTmpTable, starrocksTable))
        if err != nil {
                return err
        }
-       if resp.StatusCode != http.StatusOK {
-               c.GetLogger().Error("%s %s", resp.StatusCode, b)
-       }
-       if result["Status"] != "Success" {
-               c.GetLogger().Error("load %s failed: %s", table, b)
-       } else {
-               // drop old table and rename tmp table to old table
-               _, err = starrocks.Exec(fmt.Sprintf("drop table if exists 
%s;alter table %s_tmp rename %s", starrocksTable, starrocksTable, 
starrocksTable))
-               if err != nil {
-                       return err
-               }
-               c.GetLogger().Info("load %s to starrocks success", table)
-       }
-       return err
+       c.GetLogger().Info("load %s to starrocks success", table)
+       return nil
 }
 
 var (
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/utils.go
similarity index 58%
copy from plugins/starrocks/task_data.go
copy to plugins/starrocks/utils.go
index ceb86c00..01415af1 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/utils.go
@@ -16,12 +16,20 @@ limitations under the License.
 */
 package main
 
-type StarRocksConfig struct {
-       Host     string
-       Port     int
-       User     string
-       Password string
-       Database string
-       BePort   int `mapstructure:"be_port"`
-       Tables   []string
+import "strings"
+
+func getDataType(dataType string) string {
+       starrocksDatatype := dataType
+       if strings.HasPrefix(dataType, "varchar") {
+               starrocksDatatype = "string"
+       } else if strings.HasPrefix(dataType, "datetime") {
+               starrocksDatatype = "datetime"
+       } else if strings.HasPrefix(dataType, "bigint") {
+               starrocksDatatype = "bigint"
+       } else if dataType == "longtext" || dataType == "text" || dataType == 
"longblob" {
+               starrocksDatatype = "string"
+       } else if dataType == "tinyint(1)" {
+               starrocksDatatype = "boolean"
+       }
+       return starrocksDatatype
 }

Reply via email to