This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 80233635d0399906b3c6c75a8c622e82bf5701c9
Author: Jinlong Peng <[email protected]>
AuthorDate: Thu Sep 1 16:58:01 2022 +0800

    feat: support source dsn
---
 .../src/data/pipeline-config-samples/starrocks.js  |  2 ++
 plugins/starrocks/api/connection.go                |  2 ++
 plugins/starrocks/starrocks.go                     | 24 ++++++++++-------
 plugins/starrocks/task_data.go                     |  2 ++
 plugins/starrocks/tasks.go                         | 31 +++++++++++++++++++---
 plugins/starrocks/utils.go                         |  2 ++
 6 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js 
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index a6031392..dc8cde08 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -20,6 +20,8 @@ const starRocksConfig = [
     {
       plugin: 'starrocks',
       options: {
+        source_type: '', // mysql or postgres
+        source_dsn: '', // gorm dsn
         host: '127.0.0.1',
         port: 9030,
         user: 'root',
diff --git a/plugins/starrocks/api/connection.go 
b/plugins/starrocks/api/connection.go
index f6ac6b0d..21daaddd 100644
--- a/plugins/starrocks/api/connection.go
+++ b/plugins/starrocks/api/connection.go
@@ -36,6 +36,8 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) 
(*core.ApiResourceOutpu
 type StarRocksPipelinePlan [][]struct {
        Plugin  string `json:"plugin"`
        Options struct {
+               SourceType  string   `json:"source_type"`
+               SourceDsn   string   `json:"source_dsn"`
                Host        string   `json:"host"`
                Port        int      `json:"port"`
                User        string   `json:"user"`
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index eb3f6109..14c4175d 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -59,6 +59,8 @@ var PluginEntry StarRocks
 
 func main() {
        cmd := &cobra.Command{Use: "StarRocks"}
+       sourceType := cmd.Flags().StringP("source_type", "sp", "", "Source 
type")
+       sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn")
        _ = cmd.MarkFlagRequired("host")
        host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
        _ = cmd.MarkFlagRequired("port")
@@ -80,16 +82,18 @@ func main() {
        extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table 
sql extra")
        cmd.Run = func(cmd *cobra.Command, args []string) {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
-                       "host":       host,
-                       "port":       port,
-                       "user":       user,
-                       "password":   password,
-                       "database":   database,
-                       "be_host":    beHost,
-                       "be_port":    bePort,
-                       "tables":     tables,
-                       "batch_size": batchSize,
-                       "extra":      extra,
+                       "source_type": sourceType,
+                       "source_dsn":  sourceDsn,
+                       "host":        host,
+                       "port":        port,
+                       "user":        user,
+                       "password":    password,
+                       "database":    database,
+                       "be_host":     beHost,
+                       "be_port":     bePort,
+                       "tables":      tables,
+                       "batch_size":  batchSize,
+                       "extra":       extra,
                })
        }
        runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 76f4b1b4..bb0879b8 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,6 +17,8 @@ limitations under the License.
 package main
 
 type StarRocksConfig struct {
+       SourceType  string `mapstructure:"source_type"`
+       SourceDsn   string `mapstructure:"source_dsn"`
        Host        string
        Port        int
        User        string
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 61d4989a..ece4131c 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -21,14 +21,17 @@ import (
        "database/sql"
        "encoding/json"
        "fmt"
+       "github.com/apache/incubator-devlake/impl/dalgorm"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "gorm.io/driver/mysql"
+       "gorm.io/driver/postgres"
+       "gorm.io/gorm"
        "io"
        "net/http"
        "net/url"
        "regexp"
        "strings"
-
-       "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 type Table struct {
@@ -40,8 +43,28 @@ func (t *Table) TableName() string {
 }
 
 func LoadData(c core.SubTaskContext) error {
+       var db dal.Dal
        config := c.GetData().(*StarRocksConfig)
-       db := c.GetDal()
+       if config.SourceDsn != "" && config.SourceType != "" {
+               var o *gorm.DB
+               var err error
+               if config.SourceType == "mysql" {
+                       o, err = gorm.Open(mysql.Open(config.SourceDsn))
+                       if err != nil {
+                               return err
+                       }
+               } else if config.SourceType == "postgres" {
+                       o, err = gorm.Open(postgres.Open(config.SourceDsn))
+                       if err != nil {
+                               return err
+                       }
+               } else {
+                       return fmt.Errorf("unsupported source type %s", 
config.SourceType)
+               }
+               db = dalgorm.NewDalgorm(o)
+       } else {
+               db = c.GetDal()
+       }
        var starrocksTables []string
        if config.DomainLayer != "" {
                starrocksTables = getTablesByDomainLayer(config.DomainLayer)
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index 0306ec3a..a68c79e5 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -99,6 +99,8 @@ func getDataType(dataType string) string {
                starrocksDatatype = "double"
        } else if stringIn(dataType, "json", "jsonb") {
                starrocksDatatype = "json"
+       } else if dataType == "uuid" {
+               starrocksDatatype = "char(36)"
        }
        return starrocksDatatype
 }

Reply via email to