This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 856f6d3b2 Refactor: Database uses json type but not Golang code (#5779)
856f6d3b2 is described below

commit 856f6d3b21d47830dd30db8cf6b879d6140d9c4f
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Aug 2 13:52:00 2023 +0800

    Refactor: Database uses json type but not Golang code (#5779)
    
    * refactor: gets rid of json type on golang code for task model
    
    * feat: _devlake_tasks.subtasks uses json type in the db
    
    * fix: config-ui lint
    
    * fix: go.mod should be using golang v1.20
    
    * fix: remove debugging code
---
 .../migrationscripts/20230728_tasks_use_json.go    | 69 ++++++++++++++++++++++
 backend/core/models/migrationscripts/register.go   |  1 +
 backend/core/models/task.go                        | 34 ++++-------
 backend/core/runner/directrun.go                   | 13 +---
 backend/core/runner/run_task.go                    | 13 +---
 backend/go.mod                                     |  2 +-
 backend/helpers/migrationhelper/migrationhelper.go |  4 +-
 backend/impls/dalgorm/encdec_serializer.go         | 22 +++++--
 backend/server/services/pipeline.go                | 12 +---
 backend/server/services/task.go                    | 21 ++-----
 backend/test/helper/client.go                      | 12 +---
 .../src/pages/pipeline/components/task/index.tsx   |  2 +-
 config-ui/src/pages/pipeline/types.ts              |  2 +-
 config-ui/src/pages/project/home/index.tsx         | 10 ++--
 14 files changed, 122 insertions(+), 95 deletions(-)

diff --git a/backend/core/models/migrationscripts/20230728_tasks_use_json.go 
b/backend/core/models/migrationscripts/20230728_tasks_use_json.go
new file mode 100644
index 000000000..df30486d7
--- /dev/null
+++ b/backend/core/models/migrationscripts/20230728_tasks_use_json.go
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "encoding/json"
+
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/errors"
+       
"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+var _ plugin.MigrationScript = (*tasksUsesJSON)(nil)
+
+type tasksUsesJSON struct{}
+
+type srcTaskSubtaskJSON20230731 struct {
+       archived.Model
+       Subtasks json.RawMessage
+}
+
+type dstTaskSubtaskJSON20230731 struct {
+       archived.Model
+       Subtasks []string `gorm:"type:json;serializer:json"`
+}
+
+func (script *tasksUsesJSON) Up(basicRes context.BasicRes) errors.Error {
+       return migrationhelper.TransformColumns(
+               basicRes,
+               script,
+               "_devlake_tasks",
+               []string{"subtasks"},
+               func(src *srcTaskSubtaskJSON20230731) 
(*dstTaskSubtaskJSON20230731, errors.Error) {
+                       dst := &dstTaskSubtaskJSON20230731{
+                               Model: src.Model,
+                       }
+                       if len(src.Subtasks) == 0 {
+                               return nil, nil
+                       }
+                       errors.Must(json.Unmarshal(src.Subtasks, &dst.Subtasks))
+                       return dst, nil
+               },
+       )
+}
+
+func (*tasksUsesJSON) Version() uint64 {
+       return 20230728162121
+}
+
+func (*tasksUsesJSON) Name() string {
+       return "tasks uses json"
+}
diff --git a/backend/core/models/migrationscripts/register.go 
b/backend/core/models/migrationscripts/register.go
index 248260244..fce6fd239 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -88,5 +88,6 @@ func All() []plugin.MigrationScript {
                new(addUpdatedDateToIssueComments),
                new(addApiKeyTables),
                new(addIssueRelationship),
+               new(tasksUsesJSON),
        }
 }
diff --git a/backend/core/models/task.go b/backend/core/models/task.go
index 60f16832f..336d11f39 100644
--- a/backend/core/models/task.go
+++ b/backend/core/models/task.go
@@ -18,12 +18,10 @@ limitations under the License.
 package models
 
 import (
-       "encoding/json"
-       "github.com/apache/incubator-devlake/core/errors"
+       "time"
+
        "github.com/apache/incubator-devlake/core/models/common"
        plugin "github.com/apache/incubator-devlake/core/plugin"
-       "gorm.io/datatypes"
-       "time"
 )
 
 const (
@@ -49,14 +47,14 @@ type TaskProgressDetail struct {
 
 type Task struct {
        common.Model
-       Plugin         string              `json:"plugin" gorm:"index"`
-       Subtasks       datatypes.JSON      `json:"subtasks"`
-       Options        string              `json:"options" 
gorm:"serializer:encdec"`
-       Status         string              `json:"status"`
-       Message        string              `json:"message"`
-       ErrorName      string              `json:"errorName"`
-       Progress       float32             `json:"progress"`
-       ProgressDetail *TaskProgressDetail `json:"progressDetail" gorm:"-"`
+       Plugin         string                 `json:"plugin" gorm:"index"`
+       Subtasks       []string               `json:"subtasks" 
gorm:"type:json;serializer:json"`
+       Options        map[string]interface{} `json:"options" 
gorm:"serializer:encdec"`
+       Status         string                 `json:"status"`
+       Message        string                 `json:"message"`
+       ErrorName      string                 `json:"errorName"`
+       Progress       float32                `json:"progress"`
+       ProgressDetail *TaskProgressDetail    `json:"progressDetail" gorm:"-"`
 
        FailedSubTask string     `json:"failedSubTask"`
        PipelineId    uint64     `json:"pipelineId" gorm:"index"`
@@ -93,15 +91,3 @@ func (Task) TableName() string {
 func (Subtask) TableName() string {
        return "_devlake_subtasks"
 }
-
-func (task *Task) GetSubTasks() ([]string, errors.Error) {
-       var subtasks []string
-       err := errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
-       return subtasks, err
-}
-
-func (task *Task) GetOptions() (map[string]interface{}, errors.Error) {
-       var options map[string]interface{}
-       err := errors.Convert(json.Unmarshal([]byte(task.Options), &options))
-       return options, err
-}
diff --git a/backend/core/runner/directrun.go b/backend/core/runner/directrun.go
index 5c9f49426..cb6935b9e 100644
--- a/backend/core/runner/directrun.go
+++ b/backend/core/runner/directrun.go
@@ -19,7 +19,6 @@ package runner
 
 import (
        "context"
-       "encoding/json"
        goerror "errors"
        "fmt"
        "io"
@@ -80,18 +79,10 @@ func DirectRun(cmd *cobra.Command, args []string, 
pluginTask plugin.PluginTask,
                panic(err)
        }
        ctx := createContext()
-       optionsJson, err := json.Marshal(options)
-       if err != nil {
-               panic(err)
-       }
-       subtasksJson, err := json.Marshal(tasks)
-       if err != nil {
-               panic(err)
-       }
        task := &models.Task{
                Plugin:   cmd.Use,
-               Options:  string(optionsJson),
-               Subtasks: subtasksJson,
+               Options:  options,
+               Subtasks: tasks,
        }
        err = RunPluginSubTasks(
                ctx,
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index d87468fee..c0f80fa9f 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -189,14 +189,10 @@ func RunPluginSubTasks(
        */
 
        // user specifies what subtasks to run
-       subtaskNames, err := task.GetSubTasks()
-       if err != nil {
-               return err
-       }
-       if len(subtaskNames) != 0 {
+       if len(task.Subtasks) != 0 {
                // decode user specified subtasks
                var specifiedTasks []string
-               err := api.Decode(subtaskNames, &specifiedTasks, nil)
+               err := api.Decode(task.Subtasks, &specifiedTasks, nil)
                if err != nil {
                        return errors.Default.Wrap(err, "subtasks could not be 
decoded")
                }
@@ -235,10 +231,7 @@ func RunPluginSubTasks(
        if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
                defer closeablePlugin.Close(taskCtx)
        }
-       options, err := task.GetOptions()
-       if err != nil {
-               return err
-       }
+       options := task.Options
        taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
        if err != nil {
                return errors.Default.Wrap(err, fmt.Sprintf("error preparing 
task data for %s", task.Plugin))
diff --git a/backend/go.mod b/backend/go.mod
index 2bd432502..b13d81719 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -1,6 +1,6 @@
 module github.com/apache/incubator-devlake
 
-go 1.19
+go 1.20
 
 require (
        github.com/aws/aws-sdk-go v1.44.242
diff --git a/backend/helpers/migrationhelper/migrationhelper.go 
b/backend/helpers/migrationhelper/migrationhelper.go
index 86cb341bf..c1247e469 100644
--- a/backend/helpers/migrationhelper/migrationhelper.go
+++ b/backend/helpers/migrationhelper/migrationhelper.go
@@ -163,10 +163,12 @@ func TransformColumns[S any, D any](
                                }
 
                                dst, err := transform(src)
-
                                if err != nil {
                                        return errors.Default.Wrap(err, 
fmt.Sprintf("failed to update row %v", src))
                                }
+                               if dst == nil {
+                                       continue
+                               }
                                err = batch.Add(dst)
                                if err != nil {
                                        return errors.Default.Wrap(err, 
fmt.Sprintf("push to BatchSave failed %v", dst))
diff --git a/backend/impls/dalgorm/encdec_serializer.go 
b/backend/impls/dalgorm/encdec_serializer.go
index 508402d34..bb6589195 100644
--- a/backend/impls/dalgorm/encdec_serializer.go
+++ b/backend/impls/dalgorm/encdec_serializer.go
@@ -57,14 +57,19 @@ func (es *EncDecSerializer) Scan(ctx context.Context, field 
*schema.Field, dst r
                        return err
                }
                switch fieldValue.Elem().Kind() {
-               case reflect.Slice:
-                       bytes := []byte(decrypted)
-                       _ = json.Unmarshal(bytes, fieldValue.Interface())
-                       field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
                case reflect.String:
                        field.ReflectValueOf(ctx, dst).SetString(decrypted)
                default:
-                       return fmt.Errorf("failed to decrypt value: %#v", 
dbValue)
+                       if len(decrypted) == 0 {
+                               return nil
+                       }
+                       // deal with complex type
+                       bytes := []byte(decrypted)
+                       err := json.Unmarshal(bytes, fieldValue.Interface())
+                       if err != nil {
+                               return err
+                       }
+                       field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
                }
        }
        return nil
@@ -79,7 +84,12 @@ func (es *EncDecSerializer) Value(ctx context.Context, field 
*schema.Field, dst
        case string:
                target = v
        default:
-               return nil, fmt.Errorf("failed to encrypt value: %#v", 
fieldValue)
+               // deal with complex type
+               b, err := json.Marshal(fieldValue)
+               if err != nil {
+                       return nil, err
+               }
+               target = string(b)
        }
        return plugin.Encrypt(es.encryptionSecret, target)
 }
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 1a1f3006a..d8dd1e0e6 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -508,19 +508,11 @@ func RerunPipeline(pipelineId uint64, task *models.Task) 
(tasks []*models.Task,
                        return nil, err
                }
                // create new task
-               subtasks, err := t.GetSubTasks()
-               if err != nil {
-                       return nil, err
-               }
-               options, err := t.GetOptions()
-               if err != nil {
-                       return nil, err
-               }
                rerunTask, err := CreateTask(&models.NewTask{
                        PipelineTask: &plugin.PipelineTask{
                                Plugin:   t.Plugin,
-                               Subtasks: subtasks,
-                               Options:  options,
+                               Subtasks: t.Subtasks,
+                               Options:  t.Options,
                        },
                        PipelineId:  t.PipelineId,
                        PipelineRow: t.PipelineRow,
diff --git a/backend/server/services/task.go b/backend/server/services/task.go
index c230980b5..aab13bc29 100644
--- a/backend/server/services/task.go
+++ b/backend/server/services/task.go
@@ -19,15 +19,15 @@ package services
 
 import (
        "context"
-       "encoding/json"
        "fmt"
+       "regexp"
+       "strings"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/log"
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/impls/logruslog"
-       "regexp"
-       "strings"
 )
 
 var taskLog = logruslog.Global.Nested("task service")
@@ -44,19 +44,10 @@ type TaskQuery struct {
 
 // CreateTask creates a new task
 func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
-       b, err := json.Marshal(newTask.Options)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }
-       s, err := json.Marshal(newTask.Subtasks)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }
-
        task := &models.Task{
                Plugin:      newTask.Plugin,
-               Subtasks:    s,
-               Options:     string(b),
+               Subtasks:    newTask.Subtasks,
+               Options:     newTask.Options,
                Status:      models.TASK_CREATED,
                Message:     "",
                PipelineId:  newTask.PipelineId,
@@ -66,7 +57,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, 
errors.Error) {
        if newTask.IsRerun {
                task.Status = models.TASK_RERUN
        }
-       err = db.Create(task)
+       err := db.Create(task)
        if err != nil {
                taskLog.Error(err, "save task failed")
                return nil, errors.Internal.Wrap(err, "save task failed")
diff --git a/backend/test/helper/client.go b/backend/test/helper/client.go
index 229b322f9..8ac739d0d 100644
--- a/backend/test/helper/client.go
+++ b/backend/test/helper/client.go
@@ -229,18 +229,10 @@ func (d *DevlakeClient) RunPlugin(ctx context.Context, 
pluginName string, plugin
        if len(subtaskNames) == 0 {
                subtaskNames = GetSubtaskNames(pluginTask.SubTaskMetas()...)
        }
-       optionsJson, err := json.Marshal(options)
-       if err != nil {
-               return errors.Convert(err)
-       }
-       subtasksJson, err := json.Marshal(subtaskNames)
-       if err != nil {
-               return errors.Convert(err)
-       }
        task := &models.Task{
                Plugin:   pluginName,
-               Options:  string(optionsJson),
-               Subtasks: subtasksJson,
+               Options:  options,
+               Subtasks: subtaskNames,
        }
        return runner.RunPluginSubTasks(
                ctx,
diff --git a/config-ui/src/pages/pipeline/components/task/index.tsx 
b/config-ui/src/pages/pipeline/components/task/index.tsx
index f59e6506d..297a6a57b 100644
--- a/config-ui/src/pages/pipeline/components/task/index.tsx
+++ b/config-ui/src/pages/pipeline/components/task/index.tsx
@@ -39,7 +39,7 @@ export const PipelineTask = ({ task }: Props) => {
 
   const [icon, name] = useMemo(() => {
     const config = getPluginConfig(task.plugin);
-    const options = JSON.parse(task.options);
+    const options = task.options;
 
     let name = config.name;
 
diff --git a/config-ui/src/pages/pipeline/types.ts 
b/config-ui/src/pages/pipeline/types.ts
index 49b757d12..275a230eb 100644
--- a/config-ui/src/pages/pipeline/types.ts
+++ b/config-ui/src/pages/pipeline/types.ts
@@ -47,7 +47,7 @@ export type TaskType = {
   pipelineCol: number;
   beganAt: string | null;
   finishedAt: string | null;
-  options: string;
+  options: any;
   message: string;
   progressDetail?: {
     finishedSubTasks: number;
diff --git a/config-ui/src/pages/project/home/index.tsx 
b/config-ui/src/pages/project/home/index.tsx
index 23150cf07..10da4f1f1 100644
--- a/config-ui/src/pages/project/home/index.tsx
+++ b/config-ui/src/pages/project/home/index.tsx
@@ -51,9 +51,9 @@ export const ProjectHomePage = () => {
       (data?.projects ?? []).map((it) => {
         return {
           name: it.name,
-          connections: it.blueprint.settings.connections,
-          isManual: it.blueprint.isManual,
-          cronConfig: it.blueprint.cronConfig,
+          connections: it.blueprint?.settings.connections,
+          isManual: it.blueprint?.isManual,
+          cronConfig: it.blueprint?.cronConfig,
           createdAt: it.createdAt,
           lastRunCompletedAt: it.lastPipeline?.finishedAt,
           lastRunStatus: it.lastPipeline?.status,
@@ -137,12 +137,12 @@ export const ProjectHomePage = () => {
             dataIndex: 'connections',
             key: 'connections',
             render: (val: BlueprintType['settings']['connections']) =>
-              !val.length
+              !val || !val.length
                 ? 'N/A'
                 : val
                     .map((it) => {
                       const cs = onGet(`${it.plugin}-${it.connectionId}`);
-                      return cs.name;
+                      return cs?.name;
                     })
                     .join(', '),
           },

Reply via email to