This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new c235743ef feat: starrocsk plugin add table config (#8267)
c235743ef is described below

commit c235743ef470afbfe18860ab49ae39392de94a8b
Author: long2ice <[email protected]>
AuthorDate: Thu Jan 16 14:47:20 2025 +0800

    feat: starrocsk plugin add table config (#8267)
---
 backend/plugins/starrocks/tasks/task_data.go | 12 +++++++++---
 backend/plugins/starrocks/tasks/tasks.go     | 17 +++++++++++++++--
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/backend/plugins/starrocks/tasks/task_data.go 
b/backend/plugins/starrocks/tasks/task_data.go
index eb106bdab..f178cba6a 100644
--- a/backend/plugins/starrocks/tasks/task_data.go
+++ b/backend/plugins/starrocks/tasks/task_data.go
@@ -17,6 +17,11 @@ limitations under the License.
 
 package tasks
 
+type TableConfig struct {
+       IncludedColumns []string `mapstructure:"included_columns"`
+       ExcludedColumns []string `mapstructure:"excluded_columns"`
+}
+
 type StarRocksConfig struct {
        SourceType   string `mapstructure:"source_type"`
        SourceDsn    string `mapstructure:"source_dsn"`
@@ -29,8 +34,9 @@ type StarRocksConfig struct {
        BeHost       string `mapstructure:"be_host"`
        BePort       int    `mapstructure:"be_port"`
        Tables       []string
-       BatchSize    int               `mapstructure:"batch_size"`
-       OrderBy      map[string]string `mapstructure:"order_by"`
-       DomainLayer  string            `mapstructure:"domain_layer"`
+       TableConfigs map[string]TableConfig `mapstructure:"table_configs"`
+       BatchSize    int                    `mapstructure:"batch_size"`
+       OrderBy      map[string]string      `mapstructure:"order_by"`
+       DomainLayer  string                 `mapstructure:"domain_layer"`
        Extra        map[string]string
 }
diff --git a/backend/plugins/starrocks/tasks/tasks.go 
b/backend/plugins/starrocks/tasks/tasks.go
index cc33db282..b6c60e03d 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -29,6 +29,8 @@ import (
        "strings"
        "time"
 
+       "golang.org/x/exp/slices"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
@@ -138,7 +140,6 @@ func createTmpTableInStarrocks(dc *DataConfigParams) 
(map[string]string, string,
        table := dc.SrcTableName
        starrocksTable := dc.DestTableName
        starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
-
        columnMetas, err := db.GetColumns(&Table{name: table}, nil)
        updateColumn := config.UpdateColumn
        columnMap := make(map[string]string)
@@ -163,8 +164,21 @@ func createTmpTableInStarrocks(dc *DataConfigParams) 
(map[string]string, string,
        } else {
                return nil, "", false, 
errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
        }
+       tableConfig, ok := config.TableConfigs[table]
        for _, cm := range columnMetas {
                name := cm.Name()
+               if ok {
+                       if len(tableConfig.ExcludedColumns) > 0 {
+                               if slices.Contains(tableConfig.ExcludedColumns, 
name) {
+                                       continue
+                               }
+                       }
+                       if len(tableConfig.IncludedColumns) > 0 {
+                               if 
!slices.Contains(tableConfig.IncludedColumns, name) {
+                                       continue
+                               }
+                       }
+               }
                if name == updateColumn {
                        // check update column to detect skip or not
                        var updatedFrom time.Time
@@ -276,7 +290,6 @@ func copyDataToDst(dc *DataConfigParams, columnMap 
map[string]string, orderBy st
                } else {
                        return err
                }
-
        }
        defer rows.Close()
 

Reply via email to