This is an automated email from the ASF dual-hosted git repository. warren pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 80233635d0399906b3c6c75a8c622e82bf5701c9 Author: Jinlong Peng <[email protected]> AuthorDate: Thu Sep 1 16:58:01 2022 +0800 feat: support source dsn --- .../src/data/pipeline-config-samples/starrocks.js | 2 ++ plugins/starrocks/api/connection.go | 2 ++ plugins/starrocks/starrocks.go | 24 ++++++++++------- plugins/starrocks/task_data.go | 2 ++ plugins/starrocks/tasks.go | 31 +++++++++++++++++++--- plugins/starrocks/utils.go | 2 ++ 6 files changed, 49 insertions(+), 14 deletions(-) diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js index a6031392..dc8cde08 100644 --- a/config-ui/src/data/pipeline-config-samples/starrocks.js +++ b/config-ui/src/data/pipeline-config-samples/starrocks.js @@ -20,6 +20,8 @@ const starRocksConfig = [ { plugin: 'starrocks', options: { + source_type: '', // mysql or postgres + source_dsn: '', // gorm dsn host: '127.0.0.1', port: 9030, user: 'root', diff --git a/plugins/starrocks/api/connection.go b/plugins/starrocks/api/connection.go index f6ac6b0d..21daaddd 100644 --- a/plugins/starrocks/api/connection.go +++ b/plugins/starrocks/api/connection.go @@ -36,6 +36,8 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) (*core.ApiResourceOutpu type StarRocksPipelinePlan [][]struct { Plugin string `json:"plugin"` Options struct { + SourceType string `json:"source_type"` + SourceDsn string `json:"source_dsn"` Host string `json:"host"` Port int `json:"port"` User string `json:"user"` diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go index eb3f6109..14c4175d 100644 --- a/plugins/starrocks/starrocks.go +++ b/plugins/starrocks/starrocks.go @@ -59,6 +59,8 @@ var PluginEntry StarRocks func main() { cmd := &cobra.Command{Use: "StarRocks"} + sourceType := cmd.Flags().StringP("source_type", "sp", "", "Source type") + sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn") _ = cmd.MarkFlagRequired("host") host := cmd.Flags().StringP("host", "h", "", "StarRocks host") _ = cmd.MarkFlagRequired("port") @@ -80,16 +82,18 @@ func main() { extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table sql extra") cmd.Run = func(cmd *cobra.Command, args []string) { runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{ - "host": host, - "port": port, - "user": user, - "password": password, - "database": database, - "be_host": beHost, - "be_port": bePort, - "tables": tables, - "batch_size": batchSize, - "extra": extra, + "source_type": sourceType, + "source_dsn": sourceDsn, + "host": host, + "port": port, + "user": user, + "password": password, + "database": database, + "be_host": beHost, + "be_port": bePort, + "tables": tables, + "batch_size": batchSize, + "extra": extra, }) } runner.RunCmd(cmd) diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go index 76f4b1b4..bb0879b8 100644 --- a/plugins/starrocks/task_data.go +++ b/plugins/starrocks/task_data.go @@ -17,6 +17,8 @@ limitations under the License. package main type StarRocksConfig struct { + SourceType string `mapstructure:"source_type"` + SourceDsn string `mapstructure:"source_dsn"` Host string Port int User string diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go index 61d4989a..ece4131c 100644 --- a/plugins/starrocks/tasks.go +++ b/plugins/starrocks/tasks.go @@ -21,14 +21,17 @@ import ( "database/sql" "encoding/json" "fmt" + "github.com/apache/incubator-devlake/impl/dalgorm" + "github.com/apache/incubator-devlake/plugins/core" + "github.com/apache/incubator-devlake/plugins/core/dal" + "gorm.io/driver/mysql" + "gorm.io/driver/postgres" + "gorm.io/gorm" "io" "net/http" "net/url" "regexp" "strings" - - "github.com/apache/incubator-devlake/plugins/core" - "github.com/apache/incubator-devlake/plugins/core/dal" ) type Table struct { @@ -40,8 +43,28 @@ func (t *Table) TableName() string { } func LoadData(c core.SubTaskContext) error { + var db dal.Dal config := c.GetData().(*StarRocksConfig) - db := c.GetDal() + if config.SourceDsn != "" && config.SourceType != "" { + var o *gorm.DB + var err error + if config.SourceType == "mysql" { + o, err = gorm.Open(mysql.Open(config.SourceDsn)) + if err != nil { + return err + } + } else if config.SourceType == "postgres" { + o, err = gorm.Open(postgres.Open(config.SourceDsn)) + if err != nil { + return err + } + } else { + return fmt.Errorf("unsupported source type %s", config.SourceType) + } + db = dalgorm.NewDalgorm(o) + } else { + db = c.GetDal() + } var starrocksTables []string if config.DomainLayer != "" { starrocksTables = getTablesByDomainLayer(config.DomainLayer) diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go index 0306ec3a..a68c79e5 100644 --- a/plugins/starrocks/utils.go +++ b/plugins/starrocks/utils.go @@ -99,6 +99,8 @@ func getDataType(dataType string) string { starrocksDatatype = "double" } else if stringIn(dataType, "json", "jsonb") { starrocksDatatype = "json" + } else if dataType == "uuid" { + starrocksDatatype = "char(36)" } return starrocksDatatype }
