This is an automated email from the ASF dual-hosted git repository. warren pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 00f34fabd8d07a35a02acc7cbadf367943067ff3 Author: long2ice <[email protected]> AuthorDate: Mon Sep 19 19:49:10 2022 +0800 feat: starrocks plugin support pg array --- plugins/starrocks/tasks.go | 30 +++++++++++++++++++++--------- plugins/starrocks/utils.go | 7 ++++++- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go index b0a096d9..5bd68265 100644 --- a/plugins/starrocks/tasks.go +++ b/plugins/starrocks/tasks.go @@ -26,6 +26,7 @@ import ( "github.com/apache/incubator-devlake/impl/dalgorm" "github.com/apache/incubator-devlake/plugins/core" "github.com/apache/incubator-devlake/plugins/core/dal" + "github.com/lib/pq" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -104,12 +105,13 @@ func LoadData(c core.SubTaskContext) errors.Error { for _, table := range starrocksTables { starrocksTable := strings.TrimLeft(table, "_") - err = createTable(starrocks, db, starrocksTable, table, c, config.Extra) + var columnMap map[string]string + columnMap, err = createTable(starrocks, db, starrocksTable, table, c, config.Extra) if err != nil { c.GetLogger().Error(err, "create table %s in starrocks error", table) return errors.Convert(err) } - err = loadData(starrocks, c, starrocksTable, table, db, config) + err = loadData(starrocks, c, starrocksTable, table, columnMap, db, config) if err != nil { c.GetLogger().Error(err, "load data %s error", table) return errors.Convert(err) @@ -117,10 +119,11 @@ func LoadData(c core.SubTaskContext) errors.Error { } return nil } -func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) errors.Error { +func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) { columeMetas, err := db.GetColumns(&Table{name: table}, nil) + columnMap := make(map[string]string) if err != nil { - return err + return columnMap, err } var pks []string var columns []string @@ -129,9 +132,11 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str name := cm.Name() starrocksDatatype, ok := cm.ColumnType() if !ok { - return errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name)) + return columnMap, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name)) } - column := fmt.Sprintf("`%s` %s", name, getDataType(starrocksDatatype)) + dataType := getDataType(starrocksDatatype) + columnMap[name] = dataType + column := fmt.Sprintf("`%s` %s", name, dataType) columns = append(columns, column) isPrimaryKey, ok := cm.PrimaryKey() if isPrimaryKey && ok { @@ -152,10 +157,10 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", starrocksTable, strings.Join(columns, ","), extra) c.GetLogger().Info(tableSql) _, err = errors.Convert01(starrocks.Exec(tableSql)) - return err + return columnMap, err } -func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, db dal.Dal, config *StarRocksConfig) error { +func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) error { offset := 0 starrocksTmpTable := starrocksTable + "_tmp" // create tmp table in starrocks @@ -181,7 +186,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, t columns := make([]interface{}, len(cols)) columnPointers := make([]interface{}, len(cols)) for i := range columns { - columnPointers[i] = &columns[i] + dataType := columnMap[cols[i]] + if strings.HasPrefix(dataType, "array") { + var arr []string + columns[i] = &arr + columnPointers[i] = pq.Array(&arr) + } else { + columnPointers[i] = &columns[i] + } } err = rows.Scan(columnPointers...) if err != nil { diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go index ace195ff..0aa1ce5b 100644 --- a/plugins/starrocks/utils.go +++ b/plugins/starrocks/utils.go @@ -17,7 +17,10 @@ limitations under the License. package main -import "strings" +import ( + "fmt" + "strings" +) func getTablesByDomainLayer(domainLayer string) []string { switch domainLayer { @@ -102,6 +105,8 @@ func getDataType(dataType string) string { starrocksDatatype = "json" } else if dataType == "uuid" { starrocksDatatype = "char(36)" + } else if strings.HasSuffix(dataType, "[]") { + starrocksDatatype = fmt.Sprintf("array<%s>", getDataType(strings.Split(dataType, "[]")[0])) } return starrocksDatatype }
