This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e8877a41 feat: add be_host and fix hash columns
e8877a41 is described below

commit e8877a41c5cda7db8d181ef6926c8a605369045e
Author: Jinlong Peng <[email protected]>
AuthorDate: Fri Aug 19 16:08:22 2022 +0800

    feat: add be_host and fix hash columns
---
 .../src/data/pipeline-config-samples/starrocks.js  |  1 +
 plugins/starrocks/starrocks.go                     |  7 +++-
 plugins/starrocks/task_data.go                     |  5 ++-
 plugins/starrocks/tasks.go                         | 46 +++++++++++++++-------
 4 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js 
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index aadb152f..40631d2b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -25,6 +25,7 @@ const starRocksConfig = [
         user: 'root',
         password: '',
         database: 'lake',
+        be_host: '',
         be_port: 8040,
         tables: ['_tool_.*'], // support regexp
         batch_size: 10000,
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index f22903de..eb3f6109 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, 
Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+       http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,6 +37,9 @@ func (s StarRocks) PrepareTaskData(taskCtx core.TaskContext, 
options map[string]
        if err != nil {
                return nil, err
        }
+       if op.BeHost == "" {
+               op.BeHost = op.Host
+       }
        return &op, nil
 }
 
@@ -61,6 +64,7 @@ func main() {
        _ = cmd.MarkFlagRequired("port")
        port := cmd.Flags().StringP("port", "p", "", "StarRocks port")
        _ = cmd.MarkFlagRequired("port")
+       beHost := cmd.Flags().StringP("be_host", "BH", "", "StarRocks be host")
        bePort := cmd.Flags().StringP("be_port", "BP", "", "StarRocks be port")
        _ = cmd.MarkFlagRequired("user")
        user := cmd.Flags().StringP("user", "u", "", "StarRocks user")
@@ -81,6 +85,7 @@ func main() {
                        "user":       user,
                        "password":   password,
                        "database":   database,
+                       "be_host":    beHost,
                        "be_port":    bePort,
                        "tables":     tables,
                        "batch_size": batchSize,
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 00c0a982..76f4b1b4 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, 
Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+       http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,8 @@ type StarRocksConfig struct {
        User        string
        Password    string
        Database    string
-       BePort      int `mapstructure:"be_port"`
+       BeHost      string `mapstructure:"be_host"`
+       BePort      int    `mapstructure:"be_port"`
        Tables      []string
        BatchSize   int    `mapstructure:"batch_size"`
        DomainLayer string `mapstructure:"domain_layer"`
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 4f776f65..3bf9cfd3 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -21,8 +21,9 @@ import (
        "database/sql"
        "encoding/json"
        "fmt"
-       "io/ioutil"
+       "io"
        "net/http"
+       "net/url"
        "regexp"
        "strings"
 
@@ -96,7 +97,7 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
        if err != nil {
                return err
        }
-       var pks string
+       var pks []string
        var columns []string
        firstcm := ""
        for _, cm := range columeMetas {
@@ -109,22 +110,19 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
                columns = append(columns, column)
                isPrimaryKey, ok := cm.PrimaryKey()
                if isPrimaryKey && ok {
-                       if pks != "" {
-                               pks += ","
-                       }
-                       pks += name
+                       pks = append(pks, fmt.Sprintf("`%s`", name))
                }
                if firstcm == "" {
-                       firstcm = name
+                       firstcm = fmt.Sprintf("`%s`", name)
                }
        }
 
-       if pks == "" {
-               pks = firstcm
+       if len(pks) == 0 {
+               pks = append(pks, firstcm)
        }
 
        if extra == "" {
-               extra = fmt.Sprintf(`engine=olap distributed by hash(%s) 
properties("replication_num" = "1")`, pks)
+               extra = fmt.Sprintf(`engine=olap distributed by hash(%s) 
properties("replication_num" = "1")`, strings.Join(pks, ", "))
        }
        tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", 
starrocksTable, strings.Join(columns, ","), extra)
        c.GetLogger().Info(tableSql)
@@ -172,7 +170,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                        break
                }
                // insert data to tmp table
-               url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load";, 
config.Host, config.BePort, config.Database, starrocksTmpTable)
+               loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load";, 
config.BeHost, config.BePort, config.Database, starrocksTmpTable)
                headers := map[string]string{
                        "format":            "json",
                        "strip_outer_array": "true",
@@ -183,8 +181,12 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                if err != nil {
                        return err
                }
-               client := http.Client{}
-               req, err := http.NewRequest(http.MethodPut, url, 
bytes.NewBuffer(jsonData))
+               client := http.Client{
+                       CheckRedirect: func(req *http.Request, via 
[]*http.Request) error {
+                               return http.ErrUseLastResponse
+                       },
+               }
+               req, err := http.NewRequest(http.MethodPut, loadURL, 
bytes.NewBuffer(jsonData))
                if err != nil {
                        return err
                }
@@ -193,10 +195,26 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                        req.Header.Set(k, v)
                }
                resp, err := client.Do(req)
+               if err != nil && err != http.ErrUseLastResponse {
+                       return err
+               }
+               if err == http.ErrUseLastResponse {
+                       var location *url.URL
+                       location, err = resp.Location()
+                       req, err = http.NewRequest(http.MethodPut, 
location.String(), bytes.NewBuffer(jsonData))
+                       if err != nil {
+                               return err
+                       }
+                       req.SetBasicAuth(config.User, config.Password)
+                       for k, v := range headers {
+                               req.Header.Set(k, v)
+                       }
+                       resp, err = client.Do(req)
+               }
                if err != nil {
                        return err
                }
-               b, err := ioutil.ReadAll(resp.Body)
+               b, err := io.ReadAll(resp.Body)
                if err != nil {
                        return err
                }

Reply via email to