This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 00f34fabd8d07a35a02acc7cbadf367943067ff3
Author: long2ice <[email protected]>
AuthorDate: Mon Sep 19 19:49:10 2022 +0800

    feat: starrocks plugin support pg array
---
 plugins/starrocks/tasks.go | 30 +++++++++++++++++++++---------
 plugins/starrocks/utils.go |  7 ++++++-
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index b0a096d9..5bd68265 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/lib/pq"
        "gorm.io/driver/mysql"
        "gorm.io/driver/postgres"
        "gorm.io/gorm"
@@ -104,12 +105,13 @@ func LoadData(c core.SubTaskContext) errors.Error {
 
        for _, table := range starrocksTables {
                starrocksTable := strings.TrimLeft(table, "_")
-               err = createTable(starrocks, db, starrocksTable, table, c, 
config.Extra)
+               var columnMap map[string]string
+               columnMap, err = createTable(starrocks, db, starrocksTable, 
table, c, config.Extra)
                if err != nil {
                        c.GetLogger().Error(err, "create table %s in starrocks 
error", table)
                        return errors.Convert(err)
                }
-               err = loadData(starrocks, c, starrocksTable, table, db, config)
+               err = loadData(starrocks, c, starrocksTable, table, columnMap, 
db, config)
                if err != nil {
                        c.GetLogger().Error(err, "load data %s error", table)
                        return errors.Convert(err)
@@ -117,10 +119,11 @@ func LoadData(c core.SubTaskContext) errors.Error {
        }
        return nil
 }
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) errors.Error {
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) {
        columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+       columnMap := make(map[string]string)
        if err != nil {
-               return err
+               return columnMap, err
        }
        var pks []string
        var columns []string
@@ -129,9 +132,11 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
                name := cm.Name()
                starrocksDatatype, ok := cm.ColumnType()
                if !ok {
-                       return errors.Default.New(fmt.Sprintf("Get [%s] 
ColumeType Failed", name))
+                       return columnMap, errors.Default.New(fmt.Sprintf("Get 
[%s] ColumeType Failed", name))
                }
-               column := fmt.Sprintf("`%s` %s", name, 
getDataType(starrocksDatatype))
+               dataType := getDataType(starrocksDatatype)
+               columnMap[name] = dataType
+               column := fmt.Sprintf("`%s` %s", name, dataType)
                columns = append(columns, column)
                isPrimaryKey, ok := cm.PrimaryKey()
                if isPrimaryKey && ok {
@@ -152,10 +157,10 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
        tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", 
starrocksTable, strings.Join(columns, ","), extra)
        c.GetLogger().Info(tableSql)
        _, err = errors.Convert01(starrocks.Exec(tableSql))
-       return err
+       return columnMap, err
 }
 
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, db dal.Dal, config *StarRocksConfig) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) 
error {
        offset := 0
        starrocksTmpTable := starrocksTable + "_tmp"
        // create tmp table in starrocks
@@ -181,7 +186,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                        columns := make([]interface{}, len(cols))
                        columnPointers := make([]interface{}, len(cols))
                        for i := range columns {
-                               columnPointers[i] = &columns[i]
+                               dataType := columnMap[cols[i]]
+                               if strings.HasPrefix(dataType, "array") {
+                                       var arr []string
+                                       columns[i] = &arr
+                                       columnPointers[i] = pq.Array(&arr)
+                               } else {
+                                       columnPointers[i] = &columns[i]
+                               }
                        }
                        err = rows.Scan(columnPointers...)
                        if err != nil {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index ace195ff..0aa1ce5b 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -17,7 +17,10 @@ limitations under the License.
 
 package main
 
-import "strings"
+import (
+       "fmt"
+       "strings"
+)
 
 func getTablesByDomainLayer(domainLayer string) []string {
        switch domainLayer {
@@ -102,6 +105,8 @@ func getDataType(dataType string) string {
                starrocksDatatype = "json"
        } else if dataType == "uuid" {
                starrocksDatatype = "char(36)"
+       } else if strings.HasSuffix(dataType, "[]") {
+               starrocksDatatype = fmt.Sprintf("array<%s>", 
getDataType(strings.Split(dataType, "[]")[0]))
        }
        return starrocksDatatype
 }

Reply via email to