This is an automated email from the ASF dual-hosted git repository. warren pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit ccb526f6bb3dd8966c3cbe5f63ab0ab38411f42e Author: linyh <[email protected]> AuthorDate: Tue Jun 14 22:27:37 2022 +0800 add some helper for e2e test --- .github/workflows/test-e2e.yml | 1 + e2e/database.go | 22 +++- helpers/e2ehelper/data_flow_tester.go | 124 +++++++++++++++++++-- ...{csv_file_iterator_test.go => csv_file_test.go} | 22 +++- helpers/pluginhelper/csv_file_writer.go | 82 ++++++++++++++ plugins/helper/default_task_context.go | 5 +- 6 files changed, 237 insertions(+), 19 deletions(-) diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml index 1a083811..1e42cff0 100644 --- a/.github/workflows/test-e2e.yml +++ b/.github/workflows/test-e2e.yml @@ -43,3 +43,4 @@ jobs: run: | cp .env.example .env make e2e-test + make e2e-plugins diff --git a/e2e/database.go b/e2e/database.go index 42aecb29..c69db039 100644 --- a/e2e/database.go +++ b/e2e/database.go @@ -22,8 +22,10 @@ import ( "fmt" "log" "net/url" + "strings" mysqlGorm "gorm.io/driver/mysql" + postgresGorm "gorm.io/driver/postgres" "gorm.io/gorm" ) @@ -34,10 +36,17 @@ func InitializeDb() (*sql.DB, error) { if err != nil { return nil, err } - if u.Scheme == "mysql" { + + var db *sql.DB + switch strings.ToLower(u.Scheme) { + case "mysql": dbUrl = fmt.Sprintf(("%s@tcp(%s)%s?%s"), u.User.String(), u.Host, u.Path, u.RawQuery) + db, err = sql.Open(u.Scheme, dbUrl) + case "postgresql", "postgres", "pg": + db, err = sql.Open(`pgx`, dbUrl) + default: + return nil, fmt.Errorf("invalid DB_URL:%s", dbUrl) } - db, err := sql.Open("mysql", dbUrl) if err != nil { return nil, err } @@ -58,3 +67,12 @@ func InitializeGormDb() (*gorm.DB, error) { } return db, nil } + +func InitializeGormDb2() (*gorm.DB, error) { + connectionString := "merico:merico@tcp(localhost:3306)/lake" + db, err := gorm.Open(postgresGorm.Open(connectionString)) + if err != nil { + return nil, err + } + return db, nil +} diff --git a/helpers/e2ehelper/data_flow_tester.go b/helpers/e2ehelper/data_flow_tester.go index df9950b4..20625013 100644 --- a/helpers/e2ehelper/data_flow_tester.go +++ b/helpers/e2ehelper/data_flow_tester.go @@ -19,19 +19,24 @@ package e2ehelper import ( "context" + "database/sql" "fmt" - "testing" - "time" - "github.com/apache/incubator-devlake/config" "github.com/apache/incubator-devlake/helpers/pluginhelper" + "github.com/apache/incubator-devlake/impl/dalgorm" "github.com/apache/incubator-devlake/logger" "github.com/apache/incubator-devlake/plugins/core" + "github.com/apache/incubator-devlake/plugins/core/dal" "github.com/apache/incubator-devlake/plugins/helper" "github.com/apache/incubator-devlake/runner" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "gorm.io/gorm" + "gorm.io/gorm/schema" + "os" + "strings" + "testing" + "time" ) // DataFlowTester provides a universal data integrity validation facility to help `Plugin` verifying records between @@ -58,6 +63,7 @@ import ( type DataFlowTester struct { Cfg *viper.Viper Db *gorm.DB + Dal dal.Dal T *testing.T Name string Plugin core.PluginMeta @@ -78,6 +84,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta core.PluginMe return &DataFlowTester{ Cfg: cfg, Db: db, + Dal: dalgorm.NewDalgorm(db), T: t, Name: pluginName, Plugin: pluginMeta, @@ -94,7 +101,7 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) { if err != nil { panic(err) } - t.FlushTable(tableName) + t.MigrateRawTableAndFlush(tableName) // load rows and insert into target table for csvIter.HasNext() { // make sure @@ -106,6 +113,32 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) { } } +// MigrateTableAndFlush migrate table and deletes all records from specified table +func (t *DataFlowTester) MigrateRawTableAndFlush(rawRableName string) { + // flush target table + err := t.Db.Table(rawRableName).AutoMigrate(&helper.RawData{}) + if err != nil { + panic(err) + } + err = t.Db.Exec(fmt.Sprintf("DELETE FROM %s", rawRableName)).Error + if err != nil { + panic(err) + } +} + +// MigrateTableAndFlush migrate table and deletes all records from specified table +func (t *DataFlowTester) MigrateTableAndFlush(dst schema.Tabler) { + // flush target table + err := t.Db.AutoMigrate(dst) + if err != nil { + panic(err) + } + err = t.Db.Delete(dst, `true`).Error + if err != nil { + panic(err) + } +} + // FlushTable deletes all records from specified table func (t *DataFlowTester) FlushTable(tableName string) { // flush target table @@ -127,8 +160,71 @@ func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interfac // VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the // Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which // fields to compare with by specifying `targetfields` parameter. -func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfields []string, targetfields []string) { +func (t *DataFlowTester) CreateSnapshotOrVerify(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) { + _, err := os.Stat(csvRelPath) + if err == nil { + t.VerifyTable(dst, csvRelPath, pkfields, targetfields) + return + } + + location, _ := time.LoadLocation(`UTC`) + allFields := []string{} + allFields = append(pkfields, targetfields...) + dbCursor, err := t.Dal.Cursor( + dal.Select(strings.Join(allFields, `,`)), + dal.From(dst.TableName()), + ) + if err != nil { + panic(err) + } + + columns, err := dbCursor.Columns() + if err != nil { + panic(err) + } + csvWriter := pluginhelper.NewCsvFileWriter(csvRelPath, columns) + defer csvWriter.Close() + + // define how to scan value + columnTypes, _ := dbCursor.ColumnTypes() + forScanValues := make([]interface{}, len(allFields)) + for i, columnType := range columnTypes { + if columnType.ScanType().Name() == `Time` || columnType.ScanType().Name() == `NullTime` { + forScanValues[i] = new(sql.NullTime) + } else { + forScanValues[i] = new(string) + } + } + + for dbCursor.Next() { + err = dbCursor.Scan(forScanValues...) + if err != nil { + panic(err) + } + values := make([]string, len(allFields)) + for i := range forScanValues { + switch forScanValues[i].(type) { + case *sql.NullTime: + value := forScanValues[i].(*sql.NullTime) + if value.Valid { + values[i] = value.Time.In(location).Format("2006-01-02T15:04:05.000-07:00") + } else { + values[i] = `` + } + default: + values[i] = fmt.Sprint(*forScanValues[i].(*string)) + } + } + csvWriter.Write(values) + } +} + +// VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the +// Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which +// fields to compare with by specifying `targetfields` parameter. +func (t *DataFlowTester) VerifyTable(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) { csvIter := pluginhelper.NewCsvFileIterator(csvRelPath) + location, _ := time.LoadLocation(`UTC`) defer csvIter.Close() var expectedTotal int64 @@ -139,11 +235,11 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel pkvalues = append(pkvalues, expected[pkf]) } actual := make(map[string]interface{}) - where := "" + where := []string{} for _, field := range pkfields { - where += fmt.Sprintf(" %s = ?", field) + where = append(where, fmt.Sprintf(" %s = ?", field)) } - err := t.Db.Table(tableName).Where(where, pkvalues...).Find(actual).Error + err := t.Db.Table(dst.TableName()).Where(strings.Join(where, ` AND `), pkvalues...).Find(actual).Error if err != nil { panic(err) } @@ -152,17 +248,21 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel switch actual[field].(type) { // TODO: ensure testing database is in UTC timezone case time.Time: - actualValue = actual[field].(time.Time).Format("2006-01-02 15:04:05.000000000") + if actual[field] != nil { + actualValue = actual[field].(time.Time).In(location).Format("2006-01-02T15:04:05.000-07:00") + } default: - actualValue = fmt.Sprint(actual[field]) + if actual[field] != nil { + actualValue = fmt.Sprint(actual[field]) + } } - assert.Equal(t.T, expected[field], actualValue) + assert.Equal(t.T, expected[field], actualValue, fmt.Sprintf(`%s.%s not match`, dst.TableName(), field)) } expectedTotal++ } var actualTotal int64 - err := t.Db.Table(tableName).Count(&actualTotal).Error + err := t.Db.Table(dst.TableName()).Count(&actualTotal).Error if err != nil { panic(err) } diff --git a/helpers/pluginhelper/csv_file_iterator_test.go b/helpers/pluginhelper/csv_file_test.go similarity index 58% rename from helpers/pluginhelper/csv_file_iterator_test.go rename to helpers/pluginhelper/csv_file_test.go index 9c6c6394..644c88b3 100644 --- a/helpers/pluginhelper/csv_file_iterator_test.go +++ b/helpers/pluginhelper/csv_file_test.go @@ -17,12 +17,26 @@ limitations under the License. package pluginhelper -func ExampleCsvFileIterator() { - iter := NewCsvFileIterator("/path/to/foobar.csv") +import ( + "fmt" + "github.com/magiconair/properties/assert" + "testing" +) + +func TestExampleCsvFile(t *testing.T) { + tmpPath := t.TempDir() + filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath) + println(filename) + + writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"}) + writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"}) + writer.Close() + + iter := NewCsvFileIterator(filename) defer iter.Close() for iter.HasNext() { row := iter.Fetch() - println(row["name"]) // foobar - println(row["json"]) // {"url": "https://example.com"} + assert.Equal(t, row["name"], "foobar", "name not euqal") + assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal") } } diff --git a/helpers/pluginhelper/csv_file_writer.go b/helpers/pluginhelper/csv_file_writer.go new file mode 100644 index 00000000..c413c5ad --- /dev/null +++ b/helpers/pluginhelper/csv_file_writer.go @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pluginhelper + +import ( + "encoding/csv" + "os" +) + +// CsvFileWriter make writer for saving csv file easier, it write tuple to csv file +// +// Example CSV format (exported by dbeaver): +// +// "id","name","json","created_at" +// 123,"foobar","{""url"": ""https://example.com""}","2022-05-05 09:56:43.438000000" +// +type CsvFileWriter struct { + file *os.File + writer *csv.Writer + fields []string +} + +// NewCsvFileWriter create a `*CsvFileWriter` based on path to saving csv file +func NewCsvFileWriter(csvPath string, fields []string) *CsvFileWriter { + // open csv file + csvFile, err := os.Create(csvPath) + if err != nil { + panic(err) + } + csvWriter := csv.NewWriter(csvFile) + // write field names + err = csvWriter.Write(fields) + if err != nil { + panic(err) + } + csvWriter.Flush() + if err != nil { + panic(err) + } + return &CsvFileWriter{ + file: csvFile, + writer: csvWriter, + fields: fields, + } +} + +// Close releases resource +func (ci *CsvFileWriter) Close() { + ci.writer.Flush() + err := ci.file.Close() + if err != nil { + panic(err) + } +} + +// Write the values into csv +func (ci *CsvFileWriter) Write(values []string) { + err := ci.writer.Write(values) + if err != nil { + panic(err) + } +} + +// Flush the wrote data into file physically +func (ci *CsvFileWriter) Flush() { + ci.writer.Flush() +} diff --git a/plugins/helper/default_task_context.go b/plugins/helper/default_task_context.go index 9f4b6747..dcccb49d 100644 --- a/plugins/helper/default_task_context.go +++ b/plugins/helper/default_task_context.go @@ -239,7 +239,7 @@ func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext return nil, fmt.Errorf("subtask %s doesn't exist", subtask) } -// This returns a stand-alone core.SubTaskContext, +// NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, // not attached to any core.TaskContext. // Use this if you need to run/debug a subtask without // going through the usual workflow. @@ -265,6 +265,9 @@ func (c *DefaultTaskContext) SetData(data interface{}) { var _ core.TaskContext = (*DefaultTaskContext)(nil) func (c *DefaultSubTaskContext) TaskContext() core.TaskContext { + if c.taskCtx == nil { + return nil + } return c.taskCtx }
