This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new f7c37abf feat: improve starrocks plugin, add domain_layer option
(#2447)
f7c37abf is described below
commit f7c37abf5f9fd2e363a00b9f97458ce9a8a28020
Author: long2ice <[email protected]>
AuthorDate: Fri Jul 8 15:49:46 2022 +0800
feat: improve starrocks plugin, add domain_layer option (#2447)
* feat: add domain_layer option for starrocks plugin
* fix: golang lint
* fix: golang lint
---
.../src/data/pipeline-config-samples/starrocks.js | 3 +-
plugins/starrocks/task_data.go | 19 ++---
plugins/starrocks/tasks.go | 43 ++++++-----
plugins/starrocks/utils.go | 84 ++++++++++++++++++++++
4 files changed, 121 insertions(+), 28 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index 8e93e87b..aadb152f 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -28,7 +28,8 @@ const starRocksConfig = [
be_port: 8040,
tables: ['_tool_.*'], // support regexp
batch_size: 10000,
- extra: '' // will append to create table sql
+ extra: '', // will append to create table sql
+ domain_layer: '' // priority over tables
}
}
]
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index aaf907a7..00c0a982 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,13 +17,14 @@ limitations under the License.
package main
type StarRocksConfig struct {
- Host string
- Port int
- User string
- Password string
- Database string
- BePort int `mapstructure:"be_port"`
- Tables []string
- BatchSize int `mapstructure:"batch_size"`
- Extra string
+ Host string
+ Port int
+ User string
+ Password string
+ Database string
+ BePort int `mapstructure:"be_port"`
+ Tables []string
+ BatchSize int `mapstructure:"batch_size"`
+ DomainLayer string `mapstructure:"domain_layer"`
+ Extra string
}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index be98a7c6..b23e9db3 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -41,28 +41,36 @@ func (t *Table) TableName() string {
func LoadData(c core.SubTaskContext) error {
config := c.GetData().(*StarRocksConfig)
db := c.GetDal()
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
var starrocksTables []string
- if len(tables) == 0 {
- starrocksTables = allTables
+ if config.DomainLayer != "" {
+ starrocksTables = getTablesByDomainLayer(config.DomainLayer)
+ if starrocksTables == nil {
+ return fmt.Errorf("no table found by domain layer: %s",
config.DomainLayer)
+ }
} else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err = regexp.Match(r, []byte(table))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables =
append(starrocksTables, table)
+ tables := config.Tables
+ allTables, err := db.AllTables()
+ if err != nil {
+ return err
+ }
+ if len(tables) == 0 {
+ starrocksTables = allTables
+ } else {
+ for _, table := range allTables {
+ for _, r := range tables {
+ var ok bool
+ ok, err = regexp.Match(r, []byte(table))
+ if err != nil {
+ return err
+ }
+ if ok {
+ starrocksTables =
append(starrocksTables, table)
+ }
}
}
}
}
+
starrocks, err := sql.Open("mysql",
fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
config.User, config.Password, config.Host, config.Port, config.Database))
if err != nil {
return err
@@ -84,7 +92,6 @@ func LoadData(c core.SubTaskContext) error {
return nil
}
func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, extra string) error {
-
columeMetas, err := db.GetColumns(&Table{name: table}, nil)
if err != nil {
return err
@@ -98,7 +105,7 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
if !ok {
return fmt.Errorf("Get [%s] ColumeType Failed", name)
}
- column := fmt.Sprintf("%s %s", name, starrocksDatatype)
+ column := fmt.Sprintf("%s %s", name,
getDataType(starrocksDatatype))
columns = append(columns, column)
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
new file mode 100644
index 00000000..0f2f9a00
--- /dev/null
+++ b/plugins/starrocks/utils.go
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import "strings"
+
+func getTablesByDomainLayer(domainLayer string) []string {
+ switch domainLayer {
+ case "code":
+ return []string{
+ "refs_commits_diffs",
+ "pull_requests",
+ "commits",
+ "refs_pr_cherrypicks",
+ "repos",
+ "refs",
+ "pull_request_commits",
+ "repo_commits",
+ "pull_request_labels",
+ "commit_parents",
+ "notes",
+ "pull_request_comments",
+ "commit_files",
+ }
+ case "crossdomain":
+ return []string{
+ "pull_request_issues",
+ "users",
+ "issue_commits",
+ "issue_repo_commits",
+ "refs_issues_diffs",
+ "board_repos",
+ }
+ case "devops":
+ return []string{
+ "builds",
+ "jobs",
+ }
+ case "ticket":
+ return []string{
+ "board_issues",
+ "boards",
+ "changelogs",
+ "issue_comments",
+ "issue_labels",
+ "issues",
+ "sprints",
+ "issue_worklogs",
+ "board_sprints",
+ "sprint_issues",
+ }
+
+ }
+ return nil
+}
+func getDataType(dataType string) string {
+ starrocksDatatype := dataType
+ if strings.HasPrefix(dataType, "varchar") {
+ starrocksDatatype = "string"
+ } else if strings.HasPrefix(dataType, "datetime") {
+ starrocksDatatype = "datetime"
+ } else if strings.HasPrefix(dataType, "bigint") {
+ starrocksDatatype = "bigint"
+ } else if dataType == "longtext" || dataType == "text" || dataType ==
"longblob" {
+ starrocksDatatype = "string"
+ } else if dataType == "tinyint(1)" {
+ starrocksDatatype = "boolean"
+ }
+ return starrocksDatatype
+}