This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 856f6d3b2 Refactor: Database uses json type but not Golang code (#5779)
856f6d3b2 is described below
commit 856f6d3b21d47830dd30db8cf6b879d6140d9c4f
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Aug 2 13:52:00 2023 +0800
Refactor: Database uses json type but not Golang code (#5779)
* refactor: gets rid of json type on golang code for task model
* feat: _devlake_tasks.subtasks uses json type in the db
* fix: config-ui lint
* fix: go.mod should be using golang v1.20
* fix: remove debugging code
---
.../migrationscripts/20230728_tasks_use_json.go | 69 ++++++++++++++++++++++
backend/core/models/migrationscripts/register.go | 1 +
backend/core/models/task.go | 34 ++++-------
backend/core/runner/directrun.go | 13 +---
backend/core/runner/run_task.go | 13 +---
backend/go.mod | 2 +-
backend/helpers/migrationhelper/migrationhelper.go | 4 +-
backend/impls/dalgorm/encdec_serializer.go | 22 +++++--
backend/server/services/pipeline.go | 12 +---
backend/server/services/task.go | 21 ++-----
backend/test/helper/client.go | 12 +---
.../src/pages/pipeline/components/task/index.tsx | 2 +-
config-ui/src/pages/pipeline/types.ts | 2 +-
config-ui/src/pages/project/home/index.tsx | 10 ++--
14 files changed, 122 insertions(+), 95 deletions(-)
diff --git a/backend/core/models/migrationscripts/20230728_tasks_use_json.go
b/backend/core/models/migrationscripts/20230728_tasks_use_json.go
new file mode 100644
index 000000000..df30486d7
--- /dev/null
+++ b/backend/core/models/migrationscripts/20230728_tasks_use_json.go
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "encoding/json"
+
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/errors"
+
"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+var _ plugin.MigrationScript = (*tasksUsesJSON)(nil)
+
+type tasksUsesJSON struct{}
+
+type srcTaskSubtaskJSON20230731 struct {
+ archived.Model
+ Subtasks json.RawMessage
+}
+
+type dstTaskSubtaskJSON20230731 struct {
+ archived.Model
+ Subtasks []string `gorm:"type:json;serializer:json"`
+}
+
+func (script *tasksUsesJSON) Up(basicRes context.BasicRes) errors.Error {
+ return migrationhelper.TransformColumns(
+ basicRes,
+ script,
+ "_devlake_tasks",
+ []string{"subtasks"},
+ func(src *srcTaskSubtaskJSON20230731)
(*dstTaskSubtaskJSON20230731, errors.Error) {
+ dst := &dstTaskSubtaskJSON20230731{
+ Model: src.Model,
+ }
+ if len(src.Subtasks) == 0 {
+ return nil, nil
+ }
+ errors.Must(json.Unmarshal(src.Subtasks, &dst.Subtasks))
+ return dst, nil
+ },
+ )
+}
+
+func (*tasksUsesJSON) Version() uint64 {
+ return 20230728162121
+}
+
+func (*tasksUsesJSON) Name() string {
+ return "tasks uses json"
+}
diff --git a/backend/core/models/migrationscripts/register.go
b/backend/core/models/migrationscripts/register.go
index 248260244..fce6fd239 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -88,5 +88,6 @@ func All() []plugin.MigrationScript {
new(addUpdatedDateToIssueComments),
new(addApiKeyTables),
new(addIssueRelationship),
+ new(tasksUsesJSON),
}
}
diff --git a/backend/core/models/task.go b/backend/core/models/task.go
index 60f16832f..336d11f39 100644
--- a/backend/core/models/task.go
+++ b/backend/core/models/task.go
@@ -18,12 +18,10 @@ limitations under the License.
package models
import (
- "encoding/json"
- "github.com/apache/incubator-devlake/core/errors"
+ "time"
+
"github.com/apache/incubator-devlake/core/models/common"
plugin "github.com/apache/incubator-devlake/core/plugin"
- "gorm.io/datatypes"
- "time"
)
const (
@@ -49,14 +47,14 @@ type TaskProgressDetail struct {
type Task struct {
common.Model
- Plugin string `json:"plugin" gorm:"index"`
- Subtasks datatypes.JSON `json:"subtasks"`
- Options string `json:"options"
gorm:"serializer:encdec"`
- Status string `json:"status"`
- Message string `json:"message"`
- ErrorName string `json:"errorName"`
- Progress float32 `json:"progress"`
- ProgressDetail *TaskProgressDetail `json:"progressDetail" gorm:"-"`
+ Plugin string `json:"plugin" gorm:"index"`
+ Subtasks []string `json:"subtasks"
gorm:"type:json;serializer:json"`
+ Options map[string]interface{} `json:"options"
gorm:"serializer:encdec"`
+ Status string `json:"status"`
+ Message string `json:"message"`
+ ErrorName string `json:"errorName"`
+ Progress float32 `json:"progress"`
+ ProgressDetail *TaskProgressDetail `json:"progressDetail" gorm:"-"`
FailedSubTask string `json:"failedSubTask"`
PipelineId uint64 `json:"pipelineId" gorm:"index"`
@@ -93,15 +91,3 @@ func (Task) TableName() string {
func (Subtask) TableName() string {
return "_devlake_subtasks"
}
-
-func (task *Task) GetSubTasks() ([]string, errors.Error) {
- var subtasks []string
- err := errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
- return subtasks, err
-}
-
-func (task *Task) GetOptions() (map[string]interface{}, errors.Error) {
- var options map[string]interface{}
- err := errors.Convert(json.Unmarshal([]byte(task.Options), &options))
- return options, err
-}
diff --git a/backend/core/runner/directrun.go b/backend/core/runner/directrun.go
index 5c9f49426..cb6935b9e 100644
--- a/backend/core/runner/directrun.go
+++ b/backend/core/runner/directrun.go
@@ -19,7 +19,6 @@ package runner
import (
"context"
- "encoding/json"
goerror "errors"
"fmt"
"io"
@@ -80,18 +79,10 @@ func DirectRun(cmd *cobra.Command, args []string,
pluginTask plugin.PluginTask,
panic(err)
}
ctx := createContext()
- optionsJson, err := json.Marshal(options)
- if err != nil {
- panic(err)
- }
- subtasksJson, err := json.Marshal(tasks)
- if err != nil {
- panic(err)
- }
task := &models.Task{
Plugin: cmd.Use,
- Options: string(optionsJson),
- Subtasks: subtasksJson,
+ Options: options,
+ Subtasks: tasks,
}
err = RunPluginSubTasks(
ctx,
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index d87468fee..c0f80fa9f 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -189,14 +189,10 @@ func RunPluginSubTasks(
*/
// user specifies what subtasks to run
- subtaskNames, err := task.GetSubTasks()
- if err != nil {
- return err
- }
- if len(subtaskNames) != 0 {
+ if len(task.Subtasks) != 0 {
// decode user specified subtasks
var specifiedTasks []string
- err := api.Decode(subtaskNames, &specifiedTasks, nil)
+ err := api.Decode(task.Subtasks, &specifiedTasks, nil)
if err != nil {
return errors.Default.Wrap(err, "subtasks could not be
decoded")
}
@@ -235,10 +231,7 @@ func RunPluginSubTasks(
if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
defer closeablePlugin.Close(taskCtx)
}
- options, err := task.GetOptions()
- if err != nil {
- return err
- }
+ options := task.Options
taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing
task data for %s", task.Plugin))
diff --git a/backend/go.mod b/backend/go.mod
index 2bd432502..b13d81719 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -1,6 +1,6 @@
module github.com/apache/incubator-devlake
-go 1.19
+go 1.20
require (
github.com/aws/aws-sdk-go v1.44.242
diff --git a/backend/helpers/migrationhelper/migrationhelper.go
b/backend/helpers/migrationhelper/migrationhelper.go
index 86cb341bf..c1247e469 100644
--- a/backend/helpers/migrationhelper/migrationhelper.go
+++ b/backend/helpers/migrationhelper/migrationhelper.go
@@ -163,10 +163,12 @@ func TransformColumns[S any, D any](
}
dst, err := transform(src)
-
if err != nil {
return errors.Default.Wrap(err,
fmt.Sprintf("failed to update row %v", src))
}
+ if dst == nil {
+ continue
+ }
err = batch.Add(dst)
if err != nil {
return errors.Default.Wrap(err,
fmt.Sprintf("push to BatchSave failed %v", dst))
diff --git a/backend/impls/dalgorm/encdec_serializer.go
b/backend/impls/dalgorm/encdec_serializer.go
index 508402d34..bb6589195 100644
--- a/backend/impls/dalgorm/encdec_serializer.go
+++ b/backend/impls/dalgorm/encdec_serializer.go
@@ -57,14 +57,19 @@ func (es *EncDecSerializer) Scan(ctx context.Context, field
*schema.Field, dst r
return err
}
switch fieldValue.Elem().Kind() {
- case reflect.Slice:
- bytes := []byte(decrypted)
- _ = json.Unmarshal(bytes, fieldValue.Interface())
- field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
case reflect.String:
field.ReflectValueOf(ctx, dst).SetString(decrypted)
default:
- return fmt.Errorf("failed to decrypt value: %#v",
dbValue)
+ if len(decrypted) == 0 {
+ return nil
+ }
+ // deal with complex type
+ bytes := []byte(decrypted)
+ err := json.Unmarshal(bytes, fieldValue.Interface())
+ if err != nil {
+ return err
+ }
+ field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
}
}
return nil
@@ -79,7 +84,12 @@ func (es *EncDecSerializer) Value(ctx context.Context, field
*schema.Field, dst
case string:
target = v
default:
- return nil, fmt.Errorf("failed to encrypt value: %#v",
fieldValue)
+ // deal with complex type
+ b, err := json.Marshal(fieldValue)
+ if err != nil {
+ return nil, err
+ }
+ target = string(b)
}
return plugin.Encrypt(es.encryptionSecret, target)
}
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 1a1f3006a..d8dd1e0e6 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -508,19 +508,11 @@ func RerunPipeline(pipelineId uint64, task *models.Task)
(tasks []*models.Task,
return nil, err
}
// create new task
- subtasks, err := t.GetSubTasks()
- if err != nil {
- return nil, err
- }
- options, err := t.GetOptions()
- if err != nil {
- return nil, err
- }
rerunTask, err := CreateTask(&models.NewTask{
PipelineTask: &plugin.PipelineTask{
Plugin: t.Plugin,
- Subtasks: subtasks,
- Options: options,
+ Subtasks: t.Subtasks,
+ Options: t.Options,
},
PipelineId: t.PipelineId,
PipelineRow: t.PipelineRow,
diff --git a/backend/server/services/task.go b/backend/server/services/task.go
index c230980b5..aab13bc29 100644
--- a/backend/server/services/task.go
+++ b/backend/server/services/task.go
@@ -19,15 +19,15 @@ package services
import (
"context"
- "encoding/json"
"fmt"
+ "regexp"
+ "strings"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/impls/logruslog"
- "regexp"
- "strings"
)
var taskLog = logruslog.Global.Nested("task service")
@@ -44,19 +44,10 @@ type TaskQuery struct {
// CreateTask creates a new task
func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
- b, err := json.Marshal(newTask.Options)
- if err != nil {
- return nil, errors.Convert(err)
- }
- s, err := json.Marshal(newTask.Subtasks)
- if err != nil {
- return nil, errors.Convert(err)
- }
-
task := &models.Task{
Plugin: newTask.Plugin,
- Subtasks: s,
- Options: string(b),
+ Subtasks: newTask.Subtasks,
+ Options: newTask.Options,
Status: models.TASK_CREATED,
Message: "",
PipelineId: newTask.PipelineId,
@@ -66,7 +57,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task,
errors.Error) {
if newTask.IsRerun {
task.Status = models.TASK_RERUN
}
- err = db.Create(task)
+ err := db.Create(task)
if err != nil {
taskLog.Error(err, "save task failed")
return nil, errors.Internal.Wrap(err, "save task failed")
diff --git a/backend/test/helper/client.go b/backend/test/helper/client.go
index 229b322f9..8ac739d0d 100644
--- a/backend/test/helper/client.go
+++ b/backend/test/helper/client.go
@@ -229,18 +229,10 @@ func (d *DevlakeClient) RunPlugin(ctx context.Context,
pluginName string, plugin
if len(subtaskNames) == 0 {
subtaskNames = GetSubtaskNames(pluginTask.SubTaskMetas()...)
}
- optionsJson, err := json.Marshal(options)
- if err != nil {
- return errors.Convert(err)
- }
- subtasksJson, err := json.Marshal(subtaskNames)
- if err != nil {
- return errors.Convert(err)
- }
task := &models.Task{
Plugin: pluginName,
- Options: string(optionsJson),
- Subtasks: subtasksJson,
+ Options: options,
+ Subtasks: subtaskNames,
}
return runner.RunPluginSubTasks(
ctx,
diff --git a/config-ui/src/pages/pipeline/components/task/index.tsx
b/config-ui/src/pages/pipeline/components/task/index.tsx
index f59e6506d..297a6a57b 100644
--- a/config-ui/src/pages/pipeline/components/task/index.tsx
+++ b/config-ui/src/pages/pipeline/components/task/index.tsx
@@ -39,7 +39,7 @@ export const PipelineTask = ({ task }: Props) => {
const [icon, name] = useMemo(() => {
const config = getPluginConfig(task.plugin);
- const options = JSON.parse(task.options);
+ const options = task.options;
let name = config.name;
diff --git a/config-ui/src/pages/pipeline/types.ts
b/config-ui/src/pages/pipeline/types.ts
index 49b757d12..275a230eb 100644
--- a/config-ui/src/pages/pipeline/types.ts
+++ b/config-ui/src/pages/pipeline/types.ts
@@ -47,7 +47,7 @@ export type TaskType = {
pipelineCol: number;
beganAt: string | null;
finishedAt: string | null;
- options: string;
+ options: any;
message: string;
progressDetail?: {
finishedSubTasks: number;
diff --git a/config-ui/src/pages/project/home/index.tsx
b/config-ui/src/pages/project/home/index.tsx
index 23150cf07..10da4f1f1 100644
--- a/config-ui/src/pages/project/home/index.tsx
+++ b/config-ui/src/pages/project/home/index.tsx
@@ -51,9 +51,9 @@ export const ProjectHomePage = () => {
(data?.projects ?? []).map((it) => {
return {
name: it.name,
- connections: it.blueprint.settings.connections,
- isManual: it.blueprint.isManual,
- cronConfig: it.blueprint.cronConfig,
+ connections: it.blueprint?.settings.connections,
+ isManual: it.blueprint?.isManual,
+ cronConfig: it.blueprint?.cronConfig,
createdAt: it.createdAt,
lastRunCompletedAt: it.lastPipeline?.finishedAt,
lastRunStatus: it.lastPipeline?.status,
@@ -137,12 +137,12 @@ export const ProjectHomePage = () => {
dataIndex: 'connections',
key: 'connections',
render: (val: BlueprintType['settings']['connections']) =>
- !val.length
+ !val || !val.length
? 'N/A'
: val
.map((it) => {
const cs = onGet(`${it.plugin}-${it.connectionId}`);
- return cs.name;
+ return cs?.name;
})
.join(', '),
},