This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new c235743ef feat: starrocsk plugin add table config (#8267)
c235743ef is described below
commit c235743ef470afbfe18860ab49ae39392de94a8b
Author: long2ice <[email protected]>
AuthorDate: Thu Jan 16 14:47:20 2025 +0800
feat: starrocsk plugin add table config (#8267)
---
backend/plugins/starrocks/tasks/task_data.go | 12 +++++++++---
backend/plugins/starrocks/tasks/tasks.go | 17 +++++++++++++++--
2 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/backend/plugins/starrocks/tasks/task_data.go
b/backend/plugins/starrocks/tasks/task_data.go
index eb106bdab..f178cba6a 100644
--- a/backend/plugins/starrocks/tasks/task_data.go
+++ b/backend/plugins/starrocks/tasks/task_data.go
@@ -17,6 +17,11 @@ limitations under the License.
package tasks
+type TableConfig struct {
+ IncludedColumns []string `mapstructure:"included_columns"`
+ ExcludedColumns []string `mapstructure:"excluded_columns"`
+}
+
type StarRocksConfig struct {
SourceType string `mapstructure:"source_type"`
SourceDsn string `mapstructure:"source_dsn"`
@@ -29,8 +34,9 @@ type StarRocksConfig struct {
BeHost string `mapstructure:"be_host"`
BePort int `mapstructure:"be_port"`
Tables []string
- BatchSize int `mapstructure:"batch_size"`
- OrderBy map[string]string `mapstructure:"order_by"`
- DomainLayer string `mapstructure:"domain_layer"`
+ TableConfigs map[string]TableConfig `mapstructure:"table_configs"`
+ BatchSize int `mapstructure:"batch_size"`
+ OrderBy map[string]string `mapstructure:"order_by"`
+ DomainLayer string `mapstructure:"domain_layer"`
Extra map[string]string
}
diff --git a/backend/plugins/starrocks/tasks/tasks.go
b/backend/plugins/starrocks/tasks/tasks.go
index cc33db282..b6c60e03d 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -29,6 +29,8 @@ import (
"strings"
"time"
+ "golang.org/x/exp/slices"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
@@ -138,7 +140,6 @@ func createTmpTableInStarrocks(dc *DataConfigParams)
(map[string]string, string,
table := dc.SrcTableName
starrocksTable := dc.DestTableName
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
-
columnMetas, err := db.GetColumns(&Table{name: table}, nil)
updateColumn := config.UpdateColumn
columnMap := make(map[string]string)
@@ -163,8 +164,21 @@ func createTmpTableInStarrocks(dc *DataConfigParams)
(map[string]string, string,
} else {
return nil, "", false,
errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
}
+ tableConfig, ok := config.TableConfigs[table]
for _, cm := range columnMetas {
name := cm.Name()
+ if ok {
+ if len(tableConfig.ExcludedColumns) > 0 {
+ if slices.Contains(tableConfig.ExcludedColumns,
name) {
+ continue
+ }
+ }
+ if len(tableConfig.IncludedColumns) > 0 {
+ if
!slices.Contains(tableConfig.IncludedColumns, name) {
+ continue
+ }
+ }
+ }
if name == updateColumn {
// check update column to detect skip or not
var updatedFrom time.Time
@@ -276,7 +290,6 @@ func copyDataToDst(dc *DataConfigParams, columnMap
map[string]string, orderBy st
} else {
return err
}
-
}
defer rows.Close()