This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new b2b9e053 feat: improve starrocks plugin (#2415)
b2b9e053 is described below
commit b2b9e0537334ee674f466837e49f220f349a7f92
Author: long2ice <[email protected]>
AuthorDate: Wed Jul 6 00:00:02 2022 +0800
feat: improve starrocks plugin (#2415)
* feat: improve starrocks plugin
* ci: fix lint
---
.../src/data/pipeline-config-samples/starrocks.js | 4 +-
impl/dalgorm/dalgorm.go | 37 ++++
plugins/core/dal/dal.go | 2 +
plugins/starrocks/starrocks.go | 20 +-
plugins/starrocks/task_data.go | 16 +-
plugins/starrocks/tasks.go | 213 +++++++++++++--------
plugins/starrocks/{task_data.go => utils.go} | 24 ++-
7 files changed, 216 insertions(+), 100 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index 74f4bcd2..8e93e87b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -26,7 +26,9 @@ const starRocksConfig = [
password: '',
database: 'lake',
be_port: 8040,
- tables: ['_tool_github_commits']
+ tables: ['_tool_.*'], // support regexp
+ batch_size: 10000,
+ extra: '' // will append to create table sql
}
}
]
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index eb68c0a4..773c2e06 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -19,6 +19,7 @@ package dalgorm
import (
"database/sql"
+ "fmt"
"strings"
"github.com/apache/incubator-devlake/plugins/core/dal"
@@ -171,6 +172,42 @@ func (d *Dalgorm) AllTables() ([]string, error) {
}
return filteredTables, nil
}
+
+// GetTableColumns returns table columns in database
+func (d *Dalgorm) GetTableColumns(table string) (map[string]string, error) {
+ var columnSql string
+ ret := make(map[string]string)
+ if d.db.Dialector.Name() == "mysql" {
+ type MySQLColumn struct {
+ Field string
+ Type string
+ }
+ var result []MySQLColumn
+ columnSql = fmt.Sprintf("show columns from %s", table)
+ err := d.db.Raw(columnSql).Scan(&result).Error
+ if err != nil {
+ return nil, err
+ }
+ for _, item := range result {
+ ret[item.Field] = item.Type
+ }
+ } else {
+ columnSql = fmt.Sprintf("select column_name,data_type from
information_schema.COLUMNS where TABLE_NAME='%s' and TABLE_SCHEMA='public'",
table)
+ type PostgresColumn struct {
+ ColumnName string `gorm:"column_name"`
+ DataType string `gorm:"data_type"`
+ }
+ var result []PostgresColumn
+ err := d.db.Raw(columnSql).Scan(&result).Error
+ if err != nil {
+ return nil, err
+ }
+ for _, item := range result {
+ ret[item.ColumnName] = item.DataType
+ }
+ }
+ return ret, nil
+}
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 4b297f59..85d74c3b 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -58,6 +58,8 @@ type Dal interface {
Delete(entity interface{}, clauses ...Clause) error
// AllTables returns all tables in database
AllTables() ([]string, error)
+ // GetTableColumns returns table columns in database
+ GetTableColumns(table string) (map[string]string, error)
}
type DalClause struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index c4e5bb30..99f062b7 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -66,15 +66,21 @@ func main() {
database := cmd.Flags().StringP("database", "d", "", "StarRocks
database")
_ = cmd.MarkFlagRequired("table")
tables := cmd.Flags().StringArrayP("table", "t", []string{}, "StarRocks
table")
+ _ = cmd.MarkFlagRequired("batch_size")
+ batchSize := cmd.Flags().StringP("batch_size", "b", "", "StarRocks
insert batch size")
+ _ = cmd.MarkFlagRequired("batch_size")
+ extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table
sql extra")
cmd.Run = func(cmd *cobra.Command, args []string) {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
- "host": host,
- "port": port,
- "user": user,
- "password": password,
- "database": database,
- "be_port": bePort,
- "tables": tables,
+ "host": host,
+ "port": port,
+ "user": user,
+ "password": password,
+ "database": database,
+ "be_port": bePort,
+ "tables": tables,
+ "batch_size": batchSize,
+ "extra": extra,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index ceb86c00..aaf907a7 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,11 +17,13 @@ limitations under the License.
package main
type StarRocksConfig struct {
- Host string
- Port int
- User string
- Password string
- Database string
- BePort int `mapstructure:"be_port"`
- Tables []string
+ Host string
+ Port int
+ User string
+ Password string
+ Database string
+ BePort int `mapstructure:"be_port"`
+ Tables []string
+ BatchSize int `mapstructure:"batch_size"`
+ Extra string
}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index a63a5de5..b14598b1 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -23,6 +23,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
+ "regexp"
"strings"
"github.com/apache/incubator-devlake/plugins/core"
@@ -33,11 +34,25 @@ func LoadData(c core.SubTaskContext) error {
config := c.GetData().(*StarRocksConfig)
db := c.GetDal()
tables := config.Tables
- var err error
+ allTables, err := db.AllTables()
+ if err != nil {
+ return err
+ }
+ var starrocksTables []string
if len(tables) == 0 {
- tables, err = db.AllTables()
- if err != nil {
- return err
+ starrocksTables = allTables
+ } else {
+ for _, table := range allTables {
+ for _, r := range tables {
+ var ok bool
+ ok, err = regexp.Match(r, []byte(table))
+ if err != nil {
+ return err
+ }
+ if ok {
+ starrocksTables =
append(starrocksTables, table)
+ }
+ }
}
}
starrocks, err := sql.Open("mysql",
fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database))
@@ -45,100 +60,144 @@ func LoadData(c core.SubTaskContext) error {
return err
}
- for _, table := range tables {
- err = loadData(starrocks, c, table, db, config)
+ for _, table := range starrocksTables {
+ starrocksTable := strings.TrimLeft(table, "_")
+ err = createTable(starrocks, db, starrocksTable, table, c,
config.Extra)
+ if err != nil {
+ c.GetLogger().Error("create table %s in starrocks
error: %s", table, err)
+ return err
+ }
+ err = loadData(starrocks, c, starrocksTable, table, db, config)
if err != nil {
c.GetLogger().Error("load data %s error: %s", table,
err)
+ return err
}
}
return nil
}
-func loadData(starrocks *sql.DB, c core.SubTaskContext, table string, db
dal.Dal, config *StarRocksConfig) error {
- var data []map[string]interface{}
- // select data from db
- rows, err := db.RawCursor(fmt.Sprintf("select * from %s", table))
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, extra string) error {
+ columnMap, err := db.GetTableColumns(table)
if err != nil {
return err
}
- cols, err := rows.Columns()
- if err != nil {
- return err
+ var pk string
+ if _, ok := columnMap["id"]; ok {
+ pk = "id"
+ } else {
+ for k := range columnMap {
+ pk = k
+ break
+ }
+ }
+ var columns []string
+ for field, dataType := range columnMap {
+ starrocksDatatype := getDataType(dataType)
+ column := fmt.Sprintf("%s %s", field, starrocksDatatype)
+ columns = append(columns, column)
+ }
+ if extra == "" {
+ extra = fmt.Sprintf(`engine=olap distributed by hash(%s)
properties("replication_num" = "1")`, pk)
}
- for rows.Next() {
- row := make(map[string]interface{})
- columns := make([]string, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- columnPointers[i] = &columns[i]
- }
- err = rows.Scan(columnPointers...)
+ tableSql := fmt.Sprintf(`create table if not exists %s ( %s ) %s`,
starrocksTable, strings.Join(columns, ","), extra)
+ c.GetLogger().Info(tableSql)
+ _, err = starrocks.Exec(tableSql)
+ return err
+}
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string,
table string, db dal.Dal, config *StarRocksConfig) error {
+ offset := 0
+ starrocksTmpTable := starrocksTable + "_tmp"
+ // create tmp table in starrocks
+ _, execErr := starrocks.Exec(fmt.Sprintf("create table %s like %s",
starrocksTmpTable, starrocksTable))
+ if execErr != nil {
+ return execErr
+ }
+ for {
+ var data []map[string]interface{}
+ // select data from db
+ rows, err := db.RawCursor(fmt.Sprintf("select * from %s limit
%d offset %d", table, config.BatchSize, offset))
if err != nil {
return err
}
- for i, colName := range cols {
- row[colName] = columns[i]
+ cols, err := rows.Columns()
+ if err != nil {
+ return err
}
- data = append(data, row)
- }
- if len(data) == 0 {
- c.GetLogger().Warn("table %s is empty, so skip", table)
- return nil
- }
- starrocksTable := strings.TrimLeft(table, "_")
- // create tmp table in starrocks
- _, err = starrocks.Exec(fmt.Sprintf("create table %s_tmp like %s",
starrocksTable, starrocksTable))
- if err != nil {
- return err
- }
- // insert data to tmp table
- url := fmt.Sprintf("http://%s:%d/api/%s/%s_tmp/_stream_load",
config.Host, config.BePort, config.Database, starrocksTable)
- headers := map[string]string{
- "format": "json",
- "strip_outer_array": "true",
- "Expect": "100-continue",
- "ignore_json_size": "true",
- }
- // marshal User to json
- jsonData, err := json.Marshal(data)
- if err != nil {
- panic(err)
- }
- client := http.Client{}
- req, err := http.NewRequest(http.MethodPut, url,
bytes.NewBuffer(jsonData))
- if err != nil {
- panic(err)
- }
- req.SetBasicAuth(config.User, config.Password)
- for k, v := range headers {
- req.Header.Set(k, v)
- }
- resp, err := client.Do(req)
- if err != nil {
- return err
+ for rows.Next() {
+ row := make(map[string]interface{})
+ columns := make([]string, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i := range columns {
+ columnPointers[i] = &columns[i]
+ }
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return err
+ }
+ for i, colName := range cols {
+ row[colName] = columns[i]
+ }
+ data = append(data, row)
+ }
+ if len(data) == 0 {
+ c.GetLogger().Warn("no data found in table %s already,
limit: %d, offset: %d, so break", table, config.BatchSize, offset)
+ break
+ }
+ // insert data to tmp table
+ url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load",
config.Host, config.BePort, config.Database, starrocksTmpTable)
+ headers := map[string]string{
+ "format": "json",
+ "strip_outer_array": "true",
+ "Expect": "100-continue",
+ "ignore_json_size": "true",
+ }
+ jsonData, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ client := http.Client{}
+ req, err := http.NewRequest(http.MethodPut, url,
bytes.NewBuffer(jsonData))
+ if err != nil {
+ return err
+ }
+ req.SetBasicAuth(config.User, config.Password)
+ for k, v := range headers {
+ req.Header.Set(k, v)
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ var result map[string]interface{}
+ err = json.Unmarshal(b, &result)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.GetLogger().Error("%s %s", resp.StatusCode, b)
+ }
+ if result["Status"] != "Success" {
+ c.GetLogger().Error("load %s failed: %s", table, b)
+ } else {
+ c.GetLogger().Info("load %s success: %s, limit: %d,
offset: %d", table, b, config.BatchSize, offset)
+ }
+ offset += len(data)
}
- b, err := ioutil.ReadAll(resp.Body)
+ // drop old table
+ _, err := starrocks.Exec(fmt.Sprintf("drop table if exists %s",
starrocksTable))
if err != nil {
return err
}
- var result map[string]interface{}
- err = json.Unmarshal(b, &result)
+ // rename tmp table to old table
+ _, err = starrocks.Exec(fmt.Sprintf("alter table %s rename %s",
starrocksTmpTable, starrocksTable))
if err != nil {
return err
}
- if resp.StatusCode != http.StatusOK {
- c.GetLogger().Error("%s %s", resp.StatusCode, b)
- }
- if result["Status"] != "Success" {
- c.GetLogger().Error("load %s failed: %s", table, b)
- } else {
- // drop old table and rename tmp table to old table
- _, err = starrocks.Exec(fmt.Sprintf("drop table if exists
%s;alter table %s_tmp rename %s", starrocksTable, starrocksTable,
starrocksTable))
- if err != nil {
- return err
- }
- c.GetLogger().Info("load %s to starrocks success", table)
- }
- return err
+ c.GetLogger().Info("load %s to starrocks success", table)
+ return nil
}
var (
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/utils.go
similarity index 58%
copy from plugins/starrocks/task_data.go
copy to plugins/starrocks/utils.go
index ceb86c00..01415af1 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/utils.go
@@ -16,12 +16,20 @@ limitations under the License.
*/
package main
-type StarRocksConfig struct {
- Host string
- Port int
- User string
- Password string
- Database string
- BePort int `mapstructure:"be_port"`
- Tables []string
+import "strings"
+
+func getDataType(dataType string) string {
+ starrocksDatatype := dataType
+ if strings.HasPrefix(dataType, "varchar") {
+ starrocksDatatype = "string"
+ } else if strings.HasPrefix(dataType, "datetime") {
+ starrocksDatatype = "datetime"
+ } else if strings.HasPrefix(dataType, "bigint") {
+ starrocksDatatype = "bigint"
+ } else if dataType == "longtext" || dataType == "text" || dataType ==
"longblob" {
+ starrocksDatatype = "string"
+ } else if dataType == "tinyint(1)" {
+ starrocksDatatype = "boolean"
+ }
+ return starrocksDatatype
}