This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 12ac469f7 replace gorm with dal from services module (#3986)
12ac469f7 is described below
commit 12ac469f7a96dd79e0b63276dfc0e8cc220bac0e
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Dec 21 11:02:52 2022 +0800
replace gorm with dal from services module (#3986)
* refactor: add ConfigReader to BasicRes interface
* refactor: use ConfigReader in `services`
* refactor: replace grom with dal from service mod
* fix: rerun sawllowing errors
* fix: cancelled pipelines turned to failed after restart
* fix: listing pipelines api not working
* fix: listing projects api not working
* fix: keep tasks statuses as is for diagnose
* fix: filter by label
* fix: make rerun great again
* fix: bp label filer
* fix: patch project enable
* feat: re-organise debugging scripts
* refactor: remove gorm
* fix: linting errors
* fix: for cr
---
api/blueprints/blueprints.go | 40 +--
api/pipelines/pipelines.go | 69 ++--
api/project/project.go | 159 +---------
api/router.go | 9 +-
api/task/task.go | 89 +-----
impl/dalgorm/dalgorm.go | 5 +
models/project.go | 4 +-
models/task.go | 1 +
plugins/core/dal/dal.go | 4 +-
plugins/core/plugin_handle.go | 28 --
runner/run_pipeline.go | 2 +-
scripts/pm/framework/api-test.sh | 60 ++++
...ine-get-detail.sh => blueprint-v200-trigger.sh} | 4 +-
scripts/pm/framework/pipeline-cancel.sh | 2 +-
scripts/pm/framework/pipeline-get-detail.sh | 2 +-
.../{pipeline-get-detail.sh => pipeline-rerun.sh} | 4 +-
.../{pipeline-get-detail.sh => task-rerun.sh} | 4 +-
.../{create-blueprint.sh => blueprint-create.sh} | 0
...-blueprint-v200.sh => blueprint-v200-create.sh} | 4 +-
scripts/pm/github/{user.sh => gh-user-get.sh} | 0
.../{trigger-pipline.sh => pipeline-create.sh} | 4 +-
scripts/pm/github/{put-repo.sh => repo-create.sh} | 0
...ationrules.sh => transformationrules-create.sh} | 0
services/base.go | 61 ++++
services/blueprint.go | 52 +--
services/blueprint_helper.go | 108 +++----
services/blueprint_makeplan_v200.go | 14 +-
services/dm_code.go | 14 +-
services/init.go | 33 +-
services/notification.go | 7 +-
services/page_helper.go | 35 ---
services/pipeline.go | 184 +++++++++--
services/pipeline_helper.go | 90 +++---
services/pipeline_runner.go | 10 +-
services/project.go | 349 ++++++++++-----------
services/project_helper.go | 276 ----------------
services/pushapi.go | 13 +-
services/task.go | 249 +++------------
services/task_runner.go | 167 ++++++++++
39 files changed, 892 insertions(+), 1264 deletions(-)
diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index 40cbeecb3..3581d326c 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -111,26 +111,26 @@ func Get(c *gin.Context) {
shared.ApiOutputSuccess(c, blueprint, http.StatusOK)
}
-// @Summary delete blueprints
-// @Description Delete BluePrints
-// @Tags framework/blueprints
-// @Param blueprintId path string true "blueprintId"
-// @Success 200
-// @Failure 400 {object} shared.ApiBody "Bad Request"
-// @Failure 500 {object} shared.ApiBody "Internal Error"
-// @Router /blueprints/{blueprintId} [delete]
-func Delete(c *gin.Context) {
- pipelineId := c.Param("blueprintId")
- id, err := strconv.ParseUint(pipelineId, 10, 64)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad
blueprintID format supplied"))
- return
- }
- err = services.DeleteBlueprint(id)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error
deleting blueprint"))
- }
-}
+// // @Summary delete blueprints
+// // @Description Delete BluePrints
+// // @Tags framework/blueprints
+// // @Param blueprintId path string true "blueprintId"
+// // @Success 200
+// // @Failure 400 {object} shared.ApiBody "Bad Request"
+// // @Failure 500 {object} shared.ApiBody "Internal Error"
+// // @Router /blueprints/{blueprintId} [delete]
+// func Delete(c *gin.Context) {
+// pipelineId := c.Param("blueprintId")
+// id, err := strconv.ParseUint(pipelineId, 10, 64)
+// if err != nil {
+// shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad
blueprintID format supplied"))
+// return
+// }
+// err = services.DeleteBlueprint(id)
+// if err != nil {
+// shared.ApiOutputError(c, errors.Default.Wrap(err, "error
deleting blueprint"))
+// }
+// }
// @Summary patch blueprints
// @Description patch blueprints
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index c35ab5d19..b020e3f53 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -31,17 +31,6 @@ import (
"github.com/gin-gonic/gin/binding"
)
-/*
-Create and run a new pipeline
-POST /pipelines
-{
- "name": "name-of-pipeline",
- "tasks": [
- [ {"plugin": "gitlab", ...}, {"plugin": "jira"} ],
- [ {"plugin": "github", ...}],
- ]
-}
-*/
// @Summary Create and run a new pipeline
// @Description Create and run a new pipeline
// @Tags framework/pipelines
@@ -69,17 +58,6 @@ func Post(c *gin.Context) {
shared.ApiOutputSuccess(c, pipeline, http.StatusCreated)
}
-/*
-Get list of pipelines
-GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
-{
- "pipelines": [
- {"id": 1, "name": "test-pipeline", ...}
- ],
- "count": 5
-}
-*/
-
// @Summary Get list of pipelines
// @Description GET
/pipelines?status=TASK_RUNNING&pending=1&label=search_text&page=1&pagesize=10
// @Tags framework/pipelines
@@ -108,16 +86,7 @@ func Index(c *gin.Context) {
shared.ApiOutputSuccess(c, shared.ResponsePipelines{Pipelines:
pipelines, Count: count}, http.StatusOK)
}
-/*
-Get detail of a pipeline
-GET /pipelines/:pipelineId
-{
- "id": 1,
- "name": "test-pipeline",
- ...
-}
-*/
-// @Get detail of a pipeline
+// @Summary Get detail of a pipeline
// @Description GET /pipelines/:pipelineId
// @Description RETURN SAMPLE
// @Description {
@@ -146,11 +115,7 @@ func Get(c *gin.Context) {
shared.ApiOutputSuccess(c, pipeline, http.StatusOK)
}
-/*
-Cancel a pending pipeline
-DELETE /pipelines/:pipelineId
-*/
-// @Cancel a pending pipeline
+// @Summary Cancel a pending pipeline
// @Description Cancel a pending pipeline
// @Tags framework/pipelines
// @Param pipelineId path int true "pipeline ID"
@@ -173,11 +138,7 @@ func Delete(c *gin.Context) {
shared.ApiOutputSuccess(c, nil, http.StatusOK)
}
-/*
-Get download logs of a pipeline
-GET /pipelines/:pipelineId/logging.tar.gz
-*/
-// download logs of a pipeline
+// @Summary download logs of a pipeline
// @Description GET /pipelines/:pipelineId/logging.tar.gz
// @Tags framework/pipelines
// @Param pipelineId path int true "query"
@@ -206,3 +167,27 @@ func DownloadLogs(c *gin.Context) {
defer os.Remove(archive)
c.FileAttachment(archive, filepath.Base(archive))
}
+
+// RerunPipeline rerun all failed tasks of the specified pipeline
+// @Summary rerun tasks
+// @Tags framework/pipeline
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Success 200 {object} []models.Task
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
+// @Router /pipeline/{pipelineId}/rerun [post]
+func PostRerun(c *gin.Context) {
+ pipelineId := c.Param("pipelineId")
+ id, err := strconv.ParseUint(pipelineId, 10, 64)
+ if err != nil {
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad
pipelineID format supplied"))
+ return
+ }
+ rerunTasks, err := services.RerunPipeline(id, nil)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to
rerun pipeline"))
+ return
+ }
+ shared.ApiOutputSuccess(c, rerunTasks, http.StatusOK)
+}
diff --git a/api/project/project.go b/api/project/project.go
index 786d8a75f..22e25b524 100644
--- a/api/project/project.go
+++ b/api/project/project.go
@@ -18,7 +18,6 @@ limitations under the License.
package project
import (
- "fmt"
"net/http"
"github.com/apache/incubator-devlake/api/shared"
@@ -29,8 +28,8 @@ import (
)
type PaginatedProjects struct {
- Projects []*models.BaseProject `json:"projects"`
- Count int64 `json:"count"`
+ Projects []*models.Project `json:"projects"`
+ Count int64 `json:"count"`
}
// @Summary Create and run a new project
@@ -43,22 +42,13 @@ type PaginatedProjects struct {
// @Failure 500 {string} errcode.Error "Internal Error"
// @Router /projects/:projectName [get]
func GetProject(c *gin.Context) {
- projectOutput := &models.ApiOutputProject{}
projectName := c.Param("projectName")
- project, err := services.GetProject(projectName)
+ projectOutput, err := services.GetProject(projectName)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
getting project"))
return
}
-
- projectOutput.BaseProject = project.BaseProject
- err = services.LoadBluePrintAndMetrics(projectOutput)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err,
fmt.Sprintf("Failed to LoadBluePrintAndMetrics on GetProject for %s",
projectOutput.Name)))
- return
- }
-
shared.ApiOutputSuccess(c, projectOutput, http.StatusOK)
}
@@ -84,13 +74,8 @@ func GetProjects(c *gin.Context) {
return
}
- baseProjects := make([]*models.BaseProject, count)
- for i, project := range projects {
- baseProjects[i] = &project.BaseProject
- }
-
shared.ApiOutputSuccess(c, PaginatedProjects{
- Projects: baseProjects,
+ Projects: projects,
Count: count,
}, http.StatusOK)
}
@@ -106,45 +91,18 @@ func GetProjects(c *gin.Context) {
// @Router /projects [post]
func PostProject(c *gin.Context) {
projectInput := &models.ApiInputProject{}
- projectOutput := &models.ApiOutputProject{}
-
err := c.ShouldBind(projectInput)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err,
shared.BadRequestBody))
return
}
- err = services.CreateProject(&models.Project{BaseProject:
projectInput.BaseProject})
+ projectOutput, err := services.CreateProject(projectInput)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error
creating project"))
return
}
- // check if need to changed the blueprint setting
- if projectInput.Enable != nil {
- _, err =
services.PatchBlueprintEnableByProjectName(projectInput.Name,
*projectInput.Enable)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err,
"Failed to set if project enable"))
- return
- }
- }
-
- // check if need flush the Metrics
- if projectInput.Metrics != nil {
- err = services.FlushProjectMetrics(projectInput.Name,
projectInput.Metrics)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err,
"Failed to flush project metrics"))
- return
- }
- }
-
- projectOutput.BaseProject = projectInput.BaseProject
- err = services.LoadBluePrintAndMetrics(projectOutput)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err,
fmt.Sprintf("Failed to LoadBluePrintAndMetrics on PostProject for %s",
projectOutput.Name)))
- return
- }
-
shared.ApiOutputSuccess(c, projectOutput, http.StatusCreated)
}
@@ -175,110 +133,3 @@ func PatchProject(c *gin.Context) {
shared.ApiOutputSuccess(c, projectOutput, http.StatusCreated)
}
-
-// @Cancel a project
-// @Description Cancel a project
-// @Tags framework/projects
-// @Success 200
-// @Failure 400 {string} er2rcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
-// @Router /projects/:projectName [delete]
-//func DeleteProject(c *gin.Context) {
-//}
-
-// @Summary Get a ProjectMetrics
-// @Description Get a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Param projectName path string true "project name"
-// @Param pluginName path string true "plugin name"
-// @Success 200 {object} models.BaseProjectMetric
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
-// @Router /projects/:projectName/metrics/:pluginName [get]
-func GetProjectMetrics(c *gin.Context) {
- projectName := c.Param("projectName")
- pluginName := c.Param("pluginName")
-
- projectMetric, err := services.GetProjectMetric(projectName, pluginName)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error
getting project metric"))
- return
- }
-
- shared.ApiOutputSuccess(c, projectMetric.BaseProjectMetric,
http.StatusOK)
-}
-
-// @Summary Create a new ProjectMetrics
-// @Description Create a new ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Accept application/json
-// @Param project body models.BaseProjectMetric true "json"
-// @Success 200 {object} models.BaseProjectMetric
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internal Error"
-// @Router /projects/:projectName/metrics [post]
-func PostProjectMetrics(c *gin.Context) {
- projectMetric := &models.BaseProjectMetric{}
-
- projectName := c.Param("projectName")
-
- _, err1 := services.GetProject(projectName)
- if err1 != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err1,
shared.BadRequestBody))
- return
- }
-
- err := c.ShouldBind(projectMetric)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err,
shared.BadRequestBody))
- return
- }
-
- projectMetric.ProjectName = projectName
- err =
services.CreateProjectMetric(&models.ProjectMetric{BaseProjectMetric:
*projectMetric})
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error
creating project metric"))
- return
- }
-
- shared.ApiOutputSuccess(c, projectMetric, http.StatusCreated)
-}
-
-// @Summary Patch a ProjectMetrics
-// @Description Patch a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Accept application/json
-// @Param ProjectMetrics body models.BaseProjectMetric true "json"
-// @Success 200 {object} models.BaseProjectMetric
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internal Error"
-// @Router /projects/:projectName/metrics/:pluginName [patch]
-func PatchProjectMetrics(c *gin.Context) {
- projectName := c.Param("projectName")
- pluginName := c.Param("pluginName")
-
- var body map[string]interface{}
- err := c.ShouldBind(&body)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err,
shared.BadRequestBody))
- return
- }
-
- projectMetric, err := services.PatchProjectMetric(projectName,
pluginName, body)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error patch
project metric"))
- return
- }
-
- shared.ApiOutputSuccess(c, projectMetric.BaseProjectMetric,
http.StatusCreated)
-}
-
-// @delete a ProjectMetrics
-// @Description delete a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Success 200
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
-// @Router /project_metrics/:projectName/:pluginName [delete]
-//func DeleteProjectMetrics(c *gin.Context) {
-//}
diff --git a/api/router.go b/api/router.go
index 1c50f6236..d7311c4fd 100644
--- a/api/router.go
+++ b/api/router.go
@@ -51,7 +51,8 @@ func RegisterRouter(r *gin.Engine) {
r.GET("/blueprints/:blueprintId/pipelines",
blueprints.GetBlueprintPipelines)
r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
r.GET("/pipelines/:pipelineId/tasks", task.GetTaskByPipeline)
- r.POST("/pipelines/:pipelineId/tasks", task.RerunTask)
+ r.POST("/pipelines/:pipelineId/rerun", pipelines.PostRerun)
+ r.POST("/tasks/:taskId/rerun", task.PostRerun)
r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
@@ -71,12 +72,6 @@ func RegisterRouter(r *gin.Engine) {
r.POST("/projects", project.PostProject)
r.GET("/projects", project.GetProjects)
- // project metric api
- r.GET("/projects/:projectName/metrics/:pluginName",
project.GetProjectMetrics)
- r.PATCH("/projects/:projectName/metrics/:pluginName",
project.PatchProjectMetrics)
- //r.DELETE("/projects/:projectName/metrics/:pluginName",
project.DeleteProjectMetrics)
- r.POST("/projects/:projectName/metrics", project.PostProjectMetrics)
-
// mount all api resources for all plugins
pluginsApiResources, err := services.GetPluginsApiResources()
if err != nil {
diff --git a/api/task/task.go b/api/task/task.go
index 17bee977a..83dd9cb83 100644
--- a/api/task/task.go
+++ b/api/task/task.go
@@ -44,8 +44,8 @@ func Delete(c *gin.Context) {
}
type getTaskResponse struct {
- Tasks []models.Task `json:"tasks"`
- Count int `json:"count"`
+ Tasks []*models.Task `json:"tasks"`
+ Count int `json:"count"`
}
// GetTaskByPipeline return most recent tasks
@@ -71,88 +71,25 @@ func GetTaskByPipeline(c *gin.Context) {
shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count:
len(tasks)}, http.StatusOK)
}
-type rerunRequest struct {
- TaskId uint64 `json:"taskId"`
-}
-
-// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of
this pipeline will rerun
-// @Summary rerun tasks
+// RerunTask rerun the specified task.
+// @Summary rerun task
// @Tags framework/task
// @Accept application/json
-// @Param pipelineId path int true "pipelineId"
-// @Param request body rerunRequest false "specify the task to rerun. If it's
0, all failed tasks of this pipeline will rerun"
-// @Success 200 {object} shared.ApiBody
+// @Success 200 {object} models.Task
// @Failure 400 {object} shared.ApiBody "Bad Request"
// @Failure 500 {object} shared.ApiBody "Internal Error"
-// @Router /pipelines/{pipelineId}/tasks [post]
-func RerunTask(c *gin.Context) {
- var request rerunRequest
- err := c.BindJSON(&request)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
task ID format"))
- return
- }
- pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
pipeline ID format"))
- return
- }
- pipeline, err := services.GetPipeline(pipelineId)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error get
pipeline"))
- return
- }
- if pipeline.Status == models.TASK_RUNNING {
- shared.ApiOutputError(c, errors.BadInput.New("pipeline is
running"))
- return
- }
- if pipeline.Status == models.TASK_CREATED || pipeline.Status ==
models.TASK_RERUN {
- shared.ApiOutputError(c, errors.BadInput.New("pipeline is
waiting to run"))
- return
- }
-
- var failedTasks []models.Task
- if request.TaskId > 0 {
- failedTask, err := services.GetTask(request.TaskId)
- if err != nil || failedTask == nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err,
"error getting failed task"))
- return
- }
- if failedTask.PipelineId != pipelineId {
- shared.ApiOutputError(c, errors.BadInput.New("the task
ID and pipeline ID doesn't match"))
- return
- }
- failedTasks = append(failedTasks, *failedTask)
- } else {
- tasks, err := services.GetTasksWithLastStatus(pipelineId)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err,
"error getting tasks"))
- return
- }
- for _, task := range tasks {
- if task.Status == models.TASK_FAILED {
- failedTasks = append(failedTasks, task)
- }
- }
- }
- if len(failedTasks) == 0 {
- shared.ApiOutputSuccess(c, nil, http.StatusOK)
- return
- }
- err = services.DeleteCreatedTasks(pipelineId)
- if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error delete
tasks"))
- return
- }
- _, err = services.SpawnTasks(failedTasks)
+// @Router /tasks/{taskId}/rerun [post]
+func PostRerun(c *gin.Context) {
+ taskId := c.Param("taskId")
+ id, err := strconv.ParseUint(taskId, 10, 64)
if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error create
tasks"))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad taskId
format supplied"))
return
}
- err = services.UpdateDbPipelineStatus(pipelineId, models.TASK_RERUN)
+ task, err := services.RerunTask(id)
if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error create
tasks"))
+ shared.ApiOutputError(c, err)
return
}
- shared.ApiOutputSuccess(c, nil, http.StatusOK)
+ shared.ApiOutputSuccess(c, task, http.StatusOK)
}
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index b239456fb..a9a2e8403 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -360,6 +360,11 @@ func (d *Dalgorm) IsErrorNotFound(err errors.Error) bool {
return errors.Is(err, gorm.ErrRecordNotFound)
}
+// IsDuplicationError checking if the sql error is not found.
+func (d *Dalgorm) IsDuplicationError(err errors.Error) bool {
+ return strings.Contains(err.Error(), "duplicate")
+}
+
// NewDalgorm creates a *Dalgorm
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
diff --git a/models/project.go b/models/project.go
index d3596b6a8..2b7c4ab22 100644
--- a/models/project.go
+++ b/models/project.go
@@ -20,7 +20,7 @@ package models
import "github.com/apache/incubator-devlake/models/common"
type BaseProject struct {
- Name string `json:"name" mapstructure:"name"
gorm:"primaryKey;type:varchar(255)"`
+ Name string `json:"name" mapstructure:"name"
gorm:"primaryKey;type:varchar(255)" validate:"required"`
Description string `json:"description" mapstructure:"description"
gorm:"type:text"`
}
@@ -34,7 +34,7 @@ func (Project) TableName() string {
}
type BaseMetric struct {
- PluginName string `json:"pluginName" mapstructure:"pluginName"
gorm:"primaryKey;type:varchar(255)"`
+ PluginName string `json:"pluginName" mapstructure:"pluginName"
gorm:"primaryKey;type:varchar(255)" validate:"required"`
PluginOption string `json:"pluginOption" mapstructure:"pluginOption"
gorm:"type:text"`
Enable bool `json:"enable" mapstructure:"enable"
gorm:"type:boolean"`
}
diff --git a/models/task.go b/models/task.go
index bfc9d8ab7..7f09c3437 100644
--- a/models/task.go
+++ b/models/task.go
@@ -72,6 +72,7 @@ type NewTask struct {
PipelineId uint64 `json:"-"`
PipelineRow int `json:"-"`
PipelineCol int `json:"-"`
+ IsRerun bool `json:"-"`
}
type Subtask struct {
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 46077b753..90f546645 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -140,8 +140,10 @@ type Dal interface {
Session(config SessionConfig) Dal
// Begin create a new transaction
Begin() Transaction
- // checking if the sql error is not found.
+ // IsErrorNotFound returns true if error is record-not-found
IsErrorNotFound(err errors.Error) bool
+ // IsDuplicationError returns true if error is duplicate-error
+ IsDuplicationError(err errors.Error) bool
}
type Transaction interface {
diff --git a/plugins/core/plugin_handle.go b/plugins/core/plugin_handle.go
deleted file mode 100644
index 3bbe371be..000000000
--- a/plugins/core/plugin_handle.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package core
-
-import (
- "github.com/apache/incubator-devlake/errors"
-)
-
-type PluginHandle interface {
- // When the projectName in the framework layer is changed
- // this interface will be checked and this method will be called to
synchronize the projectName of each plugin
- RenameProjectName(oldName string, newName string) errors.Error
-}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index c964ad27c..7c0d36cc0 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -37,7 +37,7 @@ func RunPipeline(
var tasks []models.Task
err := db.All(
&tasks,
- dal.Where("pipeline_id = ? AND status = ?", pipelineId,
models.TASK_CREATED),
+ dal.Where("pipeline_id = ? AND status in ?", pipelineId,
[]string{models.TASK_CREATED, models.TASK_RERUN}),
dal.Orderby("pipeline_row, pipeline_col"),
)
if err != nil {
diff --git a/scripts/pm/framework/api-test.sh b/scripts/pm/framework/api-test.sh
new file mode 100755
index 000000000..758aa937c
--- /dev/null
+++ b/scripts/pm/framework/api-test.sh
@@ -0,0 +1,60 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. "$(dirname $0)/../vars/active-vars.sh"
+
+request() {
+ response="$(curl -s "$LAKE_ENDPOINT/$1" | jq --color-output)"
+ echo "
+ INDEX: $2 URL: $LAKE_ENDPOINT/$1
+ $response
+ " | less --RAW-CONTROL-CHARS
+ read PAUSE
+}
+
+start=$1
+length=$2
+counter=0
+for url in \
+"pipelines" \
+"pipelines?pageSize=1&page=1" \
+"pipelines?pageSize=1&page=2" \
+"pipelines?blueprint_id=1" \
+"pipelines?status=TASK_FAILED" \
+"pipelines?pending=1" \
+"pipelines?label=foobar" \
+"blueprints" \
+"blueprints?pageSize=1&page=1" \
+"blueprints?pageSize=1&page=2" \
+"blueprints?enable=1" \
+"blueprints?is_manual=0" \
+"blueprints?label=foobar" \
+; do
+ counter=$(($counter+1))
+ if [ "$start" = "-l" ]; then
+ printf "#%-3s %s\n" "$counter" "$url"
+ continue
+ fi
+ if [ -n "$start" ] && [ "$counter" -lt "$start" ]; then
+ continue
+ fi
+ if [ -n "$length" ] && [ "$counter" -ge $(($start+$length)) ]; then
+ continue
+ fi
+ request "$url" $counter
+done
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-get-detail.sh
b/scripts/pm/framework/blueprint-v200-trigger.sh
similarity index 90%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/blueprint-v200-trigger.sh
index 7153c67a2..910e2b7ea 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/blueprint-v200-trigger.sh
@@ -18,6 +18,6 @@
. "$(dirname $0)/../vars/active-vars.sh"
-pipeline_id=${1-"2"}
+blueprint_id=${1-"3"}
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/blueprints/$blueprint_id/trigger | jq
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-cancel.sh
b/scripts/pm/framework/pipeline-cancel.sh
index 1c1f1bdb4..e62c74534 100755
--- a/scripts/pm/framework/pipeline-cancel.sh
+++ b/scripts/pm/framework/pipeline-cancel.sh
@@ -17,6 +17,6 @@
#
. "$(dirname $0)/../vars/active-vars.sh"
-ID=${1-17}
+ID=${1-32}
curl -sv -XDELETE $LAKE_ENDPOINT/pipelines/$ID | jq
diff --git a/scripts/pm/framework/pipeline-get-detail.sh
b/scripts/pm/framework/pipeline-get-detail.sh
index 7153c67a2..081006e37 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/pipeline-get-detail.sh
@@ -18,6 +18,6 @@
. "$(dirname $0)/../vars/active-vars.sh"
-pipeline_id=${1-"2"}
+pipeline_id=${1-"31"}
curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
diff --git a/scripts/pm/framework/pipeline-get-detail.sh
b/scripts/pm/framework/pipeline-rerun.sh
similarity index 90%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/pipeline-rerun.sh
index 7153c67a2..808d1d346 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/pipeline-rerun.sh
@@ -18,6 +18,6 @@
. "$(dirname $0)/../vars/active-vars.sh"
-pipeline_id=${1-"2"}
+pipeline_id=${1-42}
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/pipelines/$pipeline_id/rerun | jq
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-get-detail.sh
b/scripts/pm/framework/task-rerun.sh
similarity index 91%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/task-rerun.sh
index 7153c67a2..1a565af42 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/task-rerun.sh
@@ -18,6 +18,6 @@
. "$(dirname $0)/../vars/active-vars.sh"
-pipeline_id=${1-"2"}
+task_id=${1-63}
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/tasks/$task_id/rerun | jq
\ No newline at end of file
diff --git a/scripts/pm/github/create-blueprint.sh
b/scripts/pm/github/blueprint-create.sh
similarity index 100%
rename from scripts/pm/github/create-blueprint.sh
rename to scripts/pm/github/blueprint-create.sh
diff --git a/scripts/pm/github/create-blueprint-v200.sh
b/scripts/pm/github/blueprint-v200-create.sh
similarity index 97%
rename from scripts/pm/github/create-blueprint-v200.sh
rename to scripts/pm/github/blueprint-v200-create.sh
index 8745c3720..ad8b8d843 100755
--- a/scripts/pm/github/create-blueprint-v200.sh
+++ b/scripts/pm/github/blueprint-v200-create.sh
@@ -29,7 +29,6 @@ curl -sv $LAKE_ENDPOINT/blueprints \
"isManual": true,
"mode": "NORMAL",
"name": "My GitHub Blueprint",
- "projectName": "$PROJECT_NAME",
"settings": {
"version": "2.0.0",
"skipOnFail": false,
@@ -45,6 +44,7 @@ curl -sv $LAKE_ENDPOINT/blueprints \
]
}
]
- }
+ },
+ "labels": ["foobar"]
}
JSON
diff --git a/scripts/pm/github/user.sh b/scripts/pm/github/gh-user-get.sh
similarity index 100%
rename from scripts/pm/github/user.sh
rename to scripts/pm/github/gh-user-get.sh
diff --git a/scripts/pm/github/trigger-pipline.sh
b/scripts/pm/github/pipeline-create.sh
similarity index 93%
rename from scripts/pm/github/trigger-pipline.sh
rename to scripts/pm/github/pipeline-create.sh
index 85965697a..900ea38df 100755
--- a/scripts/pm/github/trigger-pipline.sh
+++ b/scripts/pm/github/pipeline-create.sh
@@ -25,6 +25,7 @@ curl -sv $LAKE_ENDPOINT/pipelines --data @- <<JSON | jq
[
{
"plugin": "github",
+ "subtasks": ["collectApiIssues"],
"options": {
"connectionId": 1,
"owner": "apache",
@@ -32,6 +33,7 @@ curl -sv $LAKE_ENDPOINT/pipelines --data @- <<JSON | jq
}
}
]
- ]
+ ],
+ "labels": [ "foobar" ]
}
JSON
diff --git a/scripts/pm/github/put-repo.sh b/scripts/pm/github/repo-create.sh
similarity index 100%
rename from scripts/pm/github/put-repo.sh
rename to scripts/pm/github/repo-create.sh
diff --git a/scripts/pm/github/create-transformationrules.sh
b/scripts/pm/github/transformationrules-create.sh
similarity index 100%
rename from scripts/pm/github/create-transformationrules.sh
rename to scripts/pm/github/transformationrules-create.sh
diff --git a/services/base.go b/services/base.go
new file mode 100644
index 000000000..07e3a3f4c
--- /dev/null
+++ b/services/base.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import "github.com/apache/incubator-devlake/errors"
+
+// Pagination holds the paginate information
+type Pagination struct {
+ Page int `form:"page"`
+ PageSize int `form:"pageSize"`
+}
+
+// GetPage returns current page number
+func (p *Pagination) GetPage() int {
+ if p.Page < 1 {
+ return 1
+ }
+ return p.Page
+}
+
+// GetPageSize returns a sensible page size based on input
+func (p *Pagination) GetPageSize() int {
+ return p.GetPageSizeOr(50)
+}
+
+// GetPageSizeOr returns the page size or fallback to `defaultVal`
+func (p *Pagination) GetPageSizeOr(defaultVal int) int {
+ if p.PageSize < 1 {
+ return defaultVal
+ }
+ return p.PageSize
+}
+
+// GetSkip returns how many records should be skipped for specified page
+func (p *Pagination) GetSkip() int {
+ return (p.GetPage() - 1) * p.GetPageSize()
+}
+
+// VerifyStruct verifies given struct with `validator`
+func VerifyStruct(v interface{}) errors.Error {
+ err := vld.Struct(v)
+ if err != nil {
+ return errors.BadInput.Wrap(err, "data verification failed")
+ }
+ return nil
+}
diff --git a/services/blueprint.go b/services/blueprint.go
index c1f5c8491..5b287d268 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -19,7 +19,6 @@ package services
import (
"encoding/json"
- goerror "errors"
"fmt"
"strings"
@@ -27,24 +26,21 @@ import (
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/go-playground/validator/v10"
"github.com/robfig/cron/v3"
- "gorm.io/gorm"
)
// BlueprintQuery is a query for GetBlueprints
type BlueprintQuery struct {
+ Pagination
Enable *bool `form:"enable,omitempty"`
IsManual *bool `form:"is_manual"`
- Page int `form:"page"`
- PageSize int `form:"pageSize"`
Label string `form:"label"`
}
var (
blueprintLog = logger.Global.Nested("blueprint")
- vld = validator.New()
)
// CreateBlueprint accepts a Blueprint instance and insert it to database
@@ -92,7 +88,7 @@ func GetBlueprints(query *BlueprintQuery)
([]*models.Blueprint, int64, errors.Er
func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) {
dbBlueprint, err := GetDbBlueprint(blueprintId)
if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("blueprint not found")
}
return nil, errors.Internal.Wrap(err, "error getting the
blueprint from database")
@@ -113,7 +109,7 @@ func GetBlueprintByProjectName(projectName string)
(*models.Blueprint, errors.Er
dbBlueprint, err := GetDbBlueprintByProjectName(projectName)
if err != nil {
// Allow specific projectName to fail to find the corresponding
blueprint
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, nil
}
return nil, errors.Internal.Wrap(err, fmt.Sprintf("error
getting the blueprint from database with project %s", projectName))
@@ -211,27 +207,6 @@ func saveBlueprint(blueprint *models.Blueprint)
(*models.Blueprint, errors.Error
return blueprint, nil
}
-// PatchBlueprintEnableByProjectName FIXME ...
-func PatchBlueprintEnableByProjectName(projectName string, enable bool)
(*models.Blueprint, errors.Error) {
- blueprint, err := GetBlueprintByProjectName(projectName)
- if err != nil {
- return nil, err
- }
-
- if blueprint == nil {
- return nil, errors.Default.New(fmt.Sprintf("do not surpport to
set enable for projectName:[%s] ,because it has no blueprint.", projectName))
- }
-
- blueprint.Enable = enable
-
- blueprint, err = saveBlueprint(blueprint)
- if err != nil {
- return nil, err
- }
-
- return blueprint, nil
-}
-
// PatchBlueprint FIXME ...
func PatchBlueprint(id uint64, body map[string]interface{})
(*models.Blueprint, errors.Error) {
// load record from db
@@ -258,19 +233,6 @@ func PatchBlueprint(id uint64, body
map[string]interface{}) (*models.Blueprint,
return blueprint, nil
}
-// DeleteBlueprint FIXME ...
-func DeleteBlueprint(id uint64) errors.Error {
- err := DeleteDbBlueprint(id)
- if err != nil {
- return errors.Internal.Wrap(err, fmt.Sprintf("error deleting
blueprint %d", id))
- }
- err = ReloadBlueprints(cronManager)
- if err != nil {
- return errors.Internal.Wrap(err, "error reloading blueprints")
- }
- return nil
-}
-
// ReloadBlueprints FIXME ...
func ReloadBlueprints(c *cron.Cron) errors.Error {
enable := true
@@ -358,9 +320,11 @@ func MakePlanForBlueprint(blueprint *models.Blueprint)
(core.PipelinePlan, error
// load project metric plugins and convert it to a map
metrics := make(map[string]json.RawMessage)
projectMetrics := make([]models.ProjectMetric, 0)
-
if blueprint.ProjectName != "" {
- db.Find(&projectMetrics, "project_name = ? AND enable =
?", blueprint.ProjectName, true)
+ err = db.All(&projectMetrics, dal.Where("project_name =
? AND enable = ?", blueprint.ProjectName, true))
+ if err != nil {
+ return nil, err
+ }
for _, projectMetric := range projectMetrics {
metrics[projectMetric.PluginName] =
json.RawMessage(projectMetric.PluginOption)
}
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 1d8c07067..facd5f7b3 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -18,28 +18,27 @@ limitations under the License.
package services
import (
- goerror "errors"
"fmt"
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
// SaveDbBlueprint accepts a Blueprint instance and upsert it to database
func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
var err error
if dbBlueprint.ID != 0 {
- err = db.Save(&dbBlueprint).Error
+ err = db.Update(&dbBlueprint)
} else {
- err = db.Create(&dbBlueprint).Error
+ err = db.Create(&dbBlueprint)
}
if err != nil {
return errors.Default.Wrap(err, "error creating DB blueprint")
}
- err = db.Delete(&models.DbBlueprintLabel{}, `blueprint_id = ?`,
dbBlueprint.ID).Error
+ err = db.Delete(&models.DbBlueprintLabel{}, dal.Where(`blueprint_id =
?`, dbBlueprint.ID))
if err != nil {
return errors.Default.Wrap(err, "error delete DB blueprint's
old labelModels")
}
@@ -47,7 +46,7 @@ func SaveDbBlueprint(dbBlueprint *models.DbBlueprint)
errors.Error {
for i := range dbBlueprint.Labels {
dbBlueprint.Labels[i].BlueprintId = dbBlueprint.ID
}
- err = db.Create(&dbBlueprint.Labels).Error
+ err = db.Create(&dbBlueprint.Labels)
if err != nil {
return errors.Default.Wrap(err, "error creating DB
blueprint's labelModels")
}
@@ -57,45 +56,45 @@ func SaveDbBlueprint(dbBlueprint *models.DbBlueprint)
errors.Error {
// GetDbBlueprints returns a paginated list of Blueprints based on `query`
func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64,
errors.Error) {
- dbBlueprints := make([]*models.DbBlueprint, 0)
- dbQuery := db.Model(dbBlueprints).Order("id DESC")
+ // process query parameters
+ clauses := []dal.Clause{dal.From(&models.DbBlueprint{})}
if query.Enable != nil {
- dbQuery = dbQuery.Where("enable = ?", *query.Enable)
+ clauses = append(clauses, dal.Where("enable = ?",
*query.Enable))
}
if query.IsManual != nil {
- dbQuery = dbQuery.Where("is_manual = ?", *query.IsManual)
+ clauses = append(clauses, dal.Where("is_manual = ?",
*query.IsManual))
}
if query.Label != "" {
- dbQuery = dbQuery.
- Joins(`left join _devlake_blueprint_labels ON
_devlake_blueprint_labels.blueprint_id = _devlake_blueprints.id`).
- Where(`_devlake_blueprint_labels.name = ?`, query.Label)
+ clauses = append(clauses,
+ dal.Join("left join _devlake_blueprint_labels bl ON
bl.blueprint_id = _devlake_blueprints.id"),
+ dal.Where("bl.name = ?", query.Label),
+ )
}
- var count int64
- err := dbQuery.Count(&count).Error
+ // count total records
+ count, err := db.Count(clauses...)
if err != nil {
- return nil, 0, errors.Default.Wrap(err, "error getting DB count
of blueprints")
+ return nil, 0, err
}
- dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
-
- err = dbQuery.Find(&dbBlueprints).Error
+ // load paginated blueprints from database
+ clauses = append(clauses,
+ dal.Orderby("id DESC"),
+ dal.Offset(query.GetSkip()),
+ dal.Limit(query.GetPageSize()),
+ )
+ dbBlueprints := make([]*models.DbBlueprint, 0)
+ err = db.All(&dbBlueprints, clauses...)
if err != nil {
- return nil, 0, errors.Default.Wrap(err, "error finding DB
blueprints")
+ return nil, 0, errors.Default.Wrap(err, "error getting DB count
of blueprints")
}
- var blueprintIds []uint64
- for _, dbBlueprint := range dbBlueprints {
- blueprintIds = append(blueprintIds, dbBlueprint.ID)
- }
- var dbLabels []models.DbBlueprintLabel
- dbLabelsMap := map[uint64][]models.DbBlueprintLabel{}
- db.Where(`blueprint_id in ?`, blueprintIds).Find(&dbLabels)
- for _, dbLabel := range dbLabels {
- dbLabelsMap[dbLabel.BlueprintId] =
append(dbLabelsMap[dbLabel.BlueprintId], dbLabel)
- }
+ // load labels for blueprints
for _, dbBlueprint := range dbBlueprints {
- dbBlueprint.Labels = dbLabelsMap[dbBlueprint.ID]
+ err = fillBlueprintDetail(dbBlueprint)
+ if err != nil {
+ return nil, 0, err
+ }
}
return dbBlueprints, count, nil
@@ -104,16 +103,16 @@ func GetDbBlueprints(query *BlueprintQuery)
([]*models.DbBlueprint, int64, error
// GetDbBlueprint returns the detail of a given Blueprint ID
func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, errors.Error) {
dbBlueprint := &models.DbBlueprint{}
- err := db.First(dbBlueprint, dbBlueprintId).Error
+ err := db.First(dbBlueprint, dal.Where("id = ?", dbBlueprintId))
if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, errors.NotFound.Wrap(err, "could not find
blueprint in DB")
}
return nil, errors.Default.Wrap(err, "error getting blueprint
from DB")
}
- err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?",
dbBlueprint.ID).Error
+ err = fillBlueprintDetail(dbBlueprint)
if err != nil {
- return nil, errors.Internal.Wrap(err, "error getting the
blueprint labels from database")
+ return nil, err
}
return dbBlueprint, nil
}
@@ -121,43 +120,20 @@ func GetDbBlueprint(dbBlueprintId uint64)
(*models.DbBlueprint, errors.Error) {
// GetDbBlueprintByProjectName returns the detail of a given projectName
func GetDbBlueprintByProjectName(projectName string) (*models.DbBlueprint,
errors.Error) {
dbBlueprint := &models.DbBlueprint{}
- err := db.Where("project_name = ?",
projectName).First(dbBlueprint).Error
+ err := db.First(dbBlueprint, dal.Where("project_name = ?", projectName))
if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, errors.NotFound.Wrap(err,
fmt.Sprintf("could not find blueprint in DB by projectName %s", projectName))
}
return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting
blueprint from DB by projectName %s", projectName))
}
- err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?",
dbBlueprint.ID).Error
+ err = fillBlueprintDetail(dbBlueprint)
if err != nil {
- return nil, errors.Internal.Wrap(err, "error getting the
blueprint labels from database")
+ return nil, err
}
return dbBlueprint, nil
}
-// RenameProjectNameForBlueprint FIXME ...
-func RenameProjectNameForBlueprint(oldProjectName string, newProjectName
string) errors.Error {
- err := db.Model(&models.DbBlueprint{}).
- Where("project_name = ?", oldProjectName).
- Updates(map[string]interface{}{
- "project_name": newProjectName,
- }).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForBlueprint from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-// DeleteDbBlueprint deletes blueprint by id
-func DeleteDbBlueprint(id uint64) errors.Error {
- err := db.Delete(&models.DbBlueprint{}, "id = ?", id).Error
- if err != nil {
- return errors.Default.Wrap(err, "error deleting blueprint from
DB")
- }
- return nil
-}
-
// parseBlueprint
func parseBlueprint(dbBlueprint *models.DbBlueprint) *models.Blueprint {
labelList := []string{}
@@ -236,3 +212,11 @@ func decryptDbBlueprint(dbBlueprint *models.DbBlueprint)
(*models.DbBlueprint, e
}
return dbBlueprint, nil
}
+
+func fillBlueprintDetail(blueprint *models.DbBlueprint) errors.Error {
+ err := db.All(&blueprint.Labels, dal.Where("blueprint_id = ?",
blueprint.ID))
+ if err != nil {
+ return errors.Internal.Wrap(err, "error getting the blueprint
labels from database")
+ }
+ return nil
+}
diff --git a/services/blueprint_makeplan_v200.go
b/services/blueprint_makeplan_v200.go
index 904d6ef6f..eed7b53a2 100644
--- a/services/blueprint_makeplan_v200.go
+++ b/services/blueprint_makeplan_v200.go
@@ -20,6 +20,7 @@ package services
import (
"encoding/json"
"fmt"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
@@ -39,18 +40,19 @@ func GeneratePlanJsonV200(
if err != nil {
return nil, err
}
- // refresh project_mapping table to reflect project/scopes relationship
+ // save scopes to database
if len(scopes) > 0 {
for _, scope := range scopes {
- e := basicRes.GetDal().CreateOrUpdate(scope)
- if e != nil {
+ err = db.CreateOrUpdate(scope)
+ if err != nil {
scopeInfo :=
fmt.Sprintf("[Id:%s][Name:%s][TableName:%s]", scope.ScopeId(),
scope.ScopeName(), scope.TableName())
- return nil, errors.Default.Wrap(e,
fmt.Sprintf("failed to create scopes:[%s]", scopeInfo))
+ return nil, errors.Default.Wrap(err,
fmt.Sprintf("failed to create scopes:[%s]", scopeInfo))
}
}
}
+ // refresh project_mapping table to reflect project/scopes relationship
if len(projectName) != 0 {
- err = basicRes.GetDal().Delete(&crossdomain.ProjectMapping{},
dal.Where("project_name = ?", projectName))
+ err = db.Delete(&crossdomain.ProjectMapping{},
dal.Where("project_name = ?", projectName))
if err != nil {
return nil, err
}
@@ -60,7 +62,7 @@ func GeneratePlanJsonV200(
Table: scope.TableName(),
RowId: scope.ScopeId(),
}
- err = basicRes.GetDal().CreateIfNotExist(projectMapping)
+ err = db.Create(projectMapping)
if err != nil {
return nil, err
}
diff --git a/services/dm_code.go b/services/dm_code.go
index 78540d365..b747b0c4a 100644
--- a/services/dm_code.go
+++ b/services/dm_code.go
@@ -20,20 +20,12 @@ package services
import (
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models/domainlayer/code"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
// GetRepos FIXME ...
func GetRepos() ([]*code.Repo, int64, errors.Error) {
repos := make([]*code.Repo, 0)
- db := db.Model(repos).Order("id DESC")
- var count int64
- err := db.Count(&count).Error
- if err != nil {
- return nil, 0, errors.Convert(err)
- }
- err = db.Find(&repos).Error
- if err != nil {
- return nil, count, errors.Convert(err)
- }
- return repos, count, nil
+ err := db.All(&repos, dal.Orderby("id DESC"))
+ return repos, int64(len(repos)), err
}
diff --git a/services/init.go b/services/init.go
index 12f7bece3..c51808cce 100644
--- a/services/init.go
+++ b/services/init.go
@@ -23,8 +23,8 @@ import (
"sync"
"github.com/apache/incubator-devlake/errors"
+ "github.com/go-playground/validator/v10"
- "github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models/migrationscripts"
@@ -32,38 +32,33 @@ import (
"github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/runner"
"github.com/robfig/cron/v3"
- "gorm.io/gorm"
)
var cfg core.ConfigReader
var log core.Logger
-var db *gorm.DB
+var db dal.Dal
var basicRes core.BasicRes
var migrator core.Migrator
var cronManager *cron.Cron
var cronLocker sync.Mutex
+var vld *validator.Validate
const failToCreateCronJob = "created cron job failed"
// Init the services module
func Init() {
var err error
- cfg = config.GetConfig()
- log = logger.Global
- db, err = runner.NewGormDb(cfg, log)
- if err != nil {
- panic(err)
- }
- // TODO: this is ugly, the lockDb / CreateAppBasicRes are coupled via
global variables cfg/log
- // it is too much for this refactor, let's solve it later
+ // basic resources initialization
+ vld = validator.New()
+ basicRes = runner.CreateAppBasicRes()
+ cfg = basicRes.GetConfigReader()
+ log = basicRes.GetLogger()
+ db = basicRes.GetDal()
// lock the database to avoid multiple devlake instances from sharing
the same one
lockDb()
- // basic resources initialization
- basicRes = runner.CreateBasicRes(cfg, log, db)
-
// initialize db migrator
migrator, err = runner.InitMigrator(basicRes)
if err != nil {
@@ -72,17 +67,20 @@ func Init() {
log.Info("migration initialized")
migrator.Register(migrationscripts.All(), "Framework")
- // now,
- // load plugins
+ // now, load the plugins
err = runner.LoadPlugins(basicRes)
if err != nil {
panic(err)
}
+
+ // pull migration scripts from plugins to migrator
for pluginName, pluginInst := range core.AllPlugins() {
if migratable, ok := pluginInst.(core.PluginMigration); ok {
migrator.Register(migratable.MigrationScripts(),
pluginName)
}
}
+
+ // check if there are pending migration
forceMigration := cfg.GetBool("FORCE_MIGRATION")
if !migrator.HasPendingScripts() || forceMigration {
err = ExecuteMigration()
@@ -107,7 +105,8 @@ func ExecuteMigration() errors.Error {
if err != nil {
panic(err)
}
- // call service init
+
+ // initialize pipeline server, mainly to start the pipeline consuming
process
pipelineServiceInit()
return nil
}
diff --git a/services/notification.go b/services/notification.go
index a067a9e36..5c1c6495b 100644
--- a/services/notification.go
+++ b/services/notification.go
@@ -22,13 +22,14 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/errors"
"io"
"math/rand"
"net/http"
"strings"
"time"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/models"
)
@@ -78,7 +79,7 @@ func (n *NotificationService)
sendNotification(notificationType models.Notificat
nonce := randSeq(16)
notification.Nonce = nonce
- err = db.Save(¬ification).Error
+ err = db.Create(¬ification)
if err != nil {
return errors.Convert(err)
}
@@ -97,7 +98,7 @@ func (n *NotificationService)
sendNotification(notificationType models.Notificat
return errors.Convert(err)
}
notification.Response = string(respBody)
- return errors.Convert(db.Save(¬ification).Error)
+ return db.Update(notification)
}
func (n *NotificationService) signature(input, nouce string) string {
diff --git a/services/page_helper.go b/services/page_helper.go
deleted file mode 100644
index 6c80705ba..000000000
--- a/services/page_helper.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package services
-
-import "gorm.io/gorm"
-
-const maxPageSize = 100
-
-func processDbClausesWithPager(tx *gorm.DB, pageSize int, page int) *gorm.DB {
- if pageSize <= 0 || pageSize > maxPageSize {
- pageSize = maxPageSize
- }
- tx = tx.Limit(pageSize)
-
- if page > 0 {
- offset := pageSize * (page - 1)
- tx = tx.Offset(offset)
- }
- return tx
-}
diff --git a/services/pipeline.go b/services/pipeline.go
index 1ba8f2a75..d94daeb04 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -29,6 +29,8 @@ import (
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/utils"
"github.com/google/uuid"
v11 "go.temporal.io/api/enums/v1"
@@ -43,10 +45,9 @@ var globalPipelineLog = logger.Global.Nested("pipeline
service")
// PipelineQuery is a query for GetPipelines
type PipelineQuery struct {
+ Pagination
Status string `form:"status"`
Pending int `form:"pending"`
- Page int `form:"page"`
- PageSize int `form:"pageSize"`
BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
Label string `form:"label"`
}
@@ -73,8 +74,14 @@ func pipelineServiceInit() {
watchTemporalPipelines()
} else {
// standalone mode: reset pipeline status
- db.Model(&models.DbPipeline{}).Where("status <> ?",
models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
- db.Model(&models.Task{}).Where("status <> ?",
models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
+ err := db.UpdateColumn(
+ &models.DbPipeline{},
+ "status", models.TASK_FAILED,
+ dal.Where("status = ?", models.TASK_RUNNING),
+ )
+ if err != nil {
+ panic(err)
+ }
}
err := ReloadBlueprints(cronManager)
@@ -159,6 +166,7 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
runningParallelLabels := []string{}
var runningParallelLabelLock sync.Mutex
+ dbPipeline := &models.DbPipeline{}
for {
globalPipelineLog.Info("acquire lock")
// start goroutine when sema lock ready and pipeline exist.
@@ -168,31 +176,45 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
panic(err)
}
globalPipelineLog.Info("get lock and wait next pipeline")
- dbPipeline := &models.DbPipeline{}
for {
cronLocker.Lock()
// prepare query to find an appropriate pipeline to
execute
- db.Where("status IN ?", []string{models.TASK_CREATED,
models.TASK_RERUN}).
- Joins(`left join _devlake_pipeline_labels ON
+ err := db.First(dbPipeline,
+ dal.Where("status IN ?",
[]string{models.TASK_CREATED, models.TASK_RERUN}),
+ dal.Join(
+ `left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
_devlake_pipeline_labels.name
LIKE 'parallel/%' AND
- _devlake_pipeline_labels.name
in ?`, runningParallelLabels).
- Group(`id`).
-
Having(`count(_devlake_pipeline_labels.name)=0`).
- Select("id").
- Order("id ASC").Limit(1).Find(dbPipeline)
+ _devlake_pipeline_labels.name
in ?`,
+ runningParallelLabels,
+ ),
+ dal.Groupby("id"),
+
dal.Having("count(_devlake_pipeline_labels.name)=0"),
+ dal.Select("id"),
+ dal.Orderby("id ASC"),
+ dal.Limit(1),
+ )
+ if err != nil && !db.IsErrorNotFound(err) {
+ globalPipelineLog.Error(err, "dequeue failed")
+ }
cronLocker.Unlock()
- if dbPipeline.ID != 0 {
+ if !db.IsErrorNotFound(err) {
break
}
time.Sleep(time.Second)
}
- db.Model(&models.DbPipeline{}).Where("id = ?",
dbPipeline.ID).Updates(map[string]interface{}{
- "status": models.TASK_RUNNING,
- "message": "",
- "began_at": time.Now(),
- })
+ // mark the pipeline running
+ err = db.UpdateColumns(&models.DbPipeline{}, []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_RUNNING},
+ {ColumnName: "message", Value: ""},
+ {ColumnName: "began_at", Value: time.Now()},
+ }, dal.Where("id = ?", dbPipeline.ID))
+ if err != nil {
+ panic(err)
+ }
+
+ // load pipeline
dbPipeline, err = GetDbPipeline(dbPipeline.ID)
if err != nil {
panic(err)
@@ -234,7 +256,7 @@ func watchTemporalPipelines() {
for range ticker.C {
// load all running pipeline from database
runningDbPipelines := make([]models.DbPipeline, 0)
- err := db.Find(&runningDbPipelines, "status = ?",
models.TASK_RUNNING).Error
+ err := db.All(&runningDbPipelines, dal.Where("status =
?", models.TASK_RUNNING))
if err != nil {
panic(err)
}
@@ -277,11 +299,11 @@ func watchTemporalPipelines() {
}
}
rp.FinishedAt =
desc.WorkflowExecutionInfo.CloseTime
- err =
db.Model(rp).Updates(map[string]interface{}{
- "status": rp.Status,
- "message": rp.Message,
- "finished_at": rp.FinishedAt,
- }).Error
+ err = db.UpdateColumns(rp, []dal.DalSet{
+ {ColumnName: "status", Value:
rp.Status},
+ {ColumnName: "message", Value:
rp.Message},
+ {ColumnName: "finished_at",
Value: rp.FinishedAt},
+ })
if err != nil {
globalPipelineLog.Error(err,
"failed to update db: %v", err)
}
@@ -352,23 +374,25 @@ func CancelPipeline(pipelineId uint64) errors.Error {
cronLocker.Lock()
defer cronLocker.Unlock()
pipeline := &models.DbPipeline{}
- err := db.First(pipeline, pipelineId).Error
+ err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return errors.BadInput.New("pipeline not found")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status ==
models.TASK_RERUN {
pipeline.Status = models.TASK_CANCELLED
- result := db.Save(pipeline)
- if result.Error != nil {
- return errors.Default.Wrap(result.Error, "faile to
update pipeline")
+ err = db.Update(pipeline)
+ if err != nil {
+ return errors.Default.Wrap(err, "faile to update
pipeline")
}
// now, with RunPipelineInQueue being block and target pipeline
got updated
// we should update the related tasks as well
- result = db.Model(&models.Task{}).
- Where("pipeline_id = ?", pipelineId).
- Update("status", models.TASK_CANCELLED)
- if result.Error != nil {
- return errors.Default.Wrap(result.Error, "faile to
update pipeline tasks")
+ err = db.UpdateColumn(
+ &models.Task{},
+ "status", models.TASK_CANCELLED,
+ dal.Where("pipeline_id = ?", pipelineId),
+ )
+ if err != nil {
+ return errors.Default.Wrap(err, "faile to update
pipeline tasks")
}
// the target pipeline is pending, no running, no need to
perform the actual cancel operation
return nil
@@ -376,7 +400,7 @@ func CancelPipeline(pipelineId uint64) errors.Error {
if temporalClient != nil {
return
errors.Convert(temporalClient.CancelWorkflow(context.Background(),
getTemporalWorkflowId(pipelineId), ""))
}
- pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId,
Pending: 1, PageSize: -1})
+ pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId,
Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
}
@@ -402,3 +426,95 @@ func getPipelineLogsPath(pipeline *models.Pipeline)
(string, errors.Error) {
}
return "", errors.Default.Wrap(err, fmt.Sprintf("error validating logs
path for pipeline #%d", pipeline.ID))
}
+
+// RerunPipeline would rerun all failed tasks or specified task
+func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task,
errors.Error) {
+ // prevent pipeline executor from doing anything that might jeopardize
the integrity
+ cronLocker.Lock()
+ defer cronLocker.Unlock()
+
+ // load the pipeline
+ pipeline, err := GetPipeline(pipelineId)
+ if err != nil {
+ return nil, err
+ }
+
+ // verify the status
+ if pipeline.Status == models.TASK_RUNNING {
+ return nil, errors.BadInput.New("pipeline is running")
+ }
+ if pipeline.Status == models.TASK_CREATED || pipeline.Status ==
models.TASK_RERUN {
+ return nil, errors.BadInput.New("pipeline is waiting to run")
+ }
+
+ // determine which tasks to rerun
+ var failedTasks []*models.Task
+ if task != nil {
+ if task.PipelineId != pipelineId {
+ return nil, errors.BadInput.New("the task ID and
pipeline ID doesn't match")
+ }
+ failedTasks = append(failedTasks, task)
+ } else {
+ tasks, err := GetTasksWithLastStatus(pipelineId)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "error getting
tasks")
+ }
+ for _, t := range tasks {
+ if t.Status != models.TASK_COMPLETED {
+ failedTasks = append(failedTasks, t)
+ }
+ }
+ }
+
+ // no tasks to rerun
+ if len(failedTasks) == 0 {
+ return nil, errors.BadInput.New("no tasks to be re-ran")
+ }
+
+ // create new tasks
+ // TODO: this is better to be wrapped inside a transaction
+ rerunTasks := []*models.Task{}
+ for _, t := range failedTasks {
+ // mark previous task failed
+ t.Status = models.TASK_FAILED
+ err := db.UpdateColumn(t, "status", models.TASK_FAILED)
+ if err != nil {
+ return nil, err
+ }
+ // create new task
+ subtasks, err := t.GetSubTasks()
+ if err != nil {
+ return nil, err
+ }
+ options, err := t.GetOptions()
+ if err != nil {
+ return nil, err
+ }
+ rerunTask, err := CreateTask(&models.NewTask{
+ PipelineTask: &core.PipelineTask{
+ Plugin: t.Plugin,
+ Subtasks: subtasks,
+ Options: options,
+ },
+ PipelineId: t.PipelineId,
+ PipelineRow: t.PipelineRow,
+ PipelineCol: t.PipelineCol,
+ IsRerun: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+ // append to result
+ rerunTasks = append(rerunTasks, rerunTask)
+ }
+
+ // mark pipline rerun
+ err = db.UpdateColumn(&models.DbPipeline{},
+ "status", models.TASK_RERUN,
+ dal.Where("id = ?", pipelineId),
+ )
+ if err != nil {
+ return nil, err
+ }
+ return rerunTasks, nil
+}
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index d9300e76b..4aea9279e 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -19,14 +19,12 @@ package services
import (
"encoding/json"
- goerror "errors"
"fmt"
- "github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
// ErrBlueprintRunning indicates there is a running pipeline with the
specified blueprint_id
@@ -37,8 +35,10 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.DbPipeline, erro
cronLocker.Lock()
defer cronLocker.Unlock()
if newPipeline.BlueprintId > 0 {
- var count int64
- err := db.Model(&models.DbPipeline{}).Where("blueprint_id = ?
AND status IN ?", newPipeline.BlueprintId,
models.PendingTaskStatus).Count(&count).Error
+ count, err := db.Count(
+ dal.From(&models.DbPipeline{}),
+ dal.Where("blueprint_id = ? AND status IN ?",
newPipeline.BlueprintId, models.PendingTaskStatus),
+ )
if err != nil {
return nil, errors.Default.Wrap(err, "query pipelines
error")
}
@@ -69,7 +69,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.DbPipeline, erro
}
// save pipeline to database
- if err := db.Create(&dbPipeline).Error; err != nil {
+ if err := db.Create(&dbPipeline); err != nil {
globalPipelineLog.Error(err, "create pipeline failed: %v", err)
return nil, errors.Internal.Wrap(err, "create pipeline failed")
}
@@ -82,7 +82,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.DbPipeline, erro
})
}
if len(dbPipeline.Labels) > 0 {
- if err := db.Create(&dbPipeline.Labels).Error; err != nil {
+ if err := db.Create(&dbPipeline.Labels); err != nil {
globalPipelineLog.Error(err, "create pipeline's
labelModels failed: %v", err)
return nil, errors.Internal.Wrap(err, "create
pipeline's labelModels failed")
}
@@ -117,9 +117,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.DbPipeline, erro
}
// update tasks state
- if err := db.Model(dbPipeline).Updates(map[string]interface{}{
- "total_tasks": dbPipeline.TotalTasks,
- }).Error; err != nil {
+ if err := db.Update(dbPipeline); err != nil {
globalPipelineLog.Error(err, "update pipline state failed: %v",
err)
return nil, errors.Internal.Wrap(err, "update pipline state
failed")
}
@@ -129,47 +127,48 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(*models.DbPipeline, erro
// GetDbPipelines by query
func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64,
errors.Error) {
- dbPipelines := make([]*models.DbPipeline, 0)
- dbQuery := db.Model(dbPipelines).Order("id DESC")
+ // process query parameters
+ clauses := []dal.Clause{dal.From(&models.DbPipeline{})}
if query.BlueprintId != 0 {
- dbQuery = dbQuery.Where("blueprint_id = ?", query.BlueprintId)
+ clauses = append(clauses, dal.Where("blueprint_id = ?",
query.BlueprintId))
}
if query.Status != "" {
- dbQuery = dbQuery.Where("status = ?", query.Status)
+ clauses = append(clauses, dal.Where("status = ?", query.Status))
}
if query.Pending > 0 {
- dbQuery = dbQuery.Where("finished_at is null and status IN ?",
models.PendingTaskStatus)
+ clauses = append(clauses, dal.Where("finished_at is null and
status IN ?", models.PendingTaskStatus))
}
if query.Label != "" {
- dbQuery = dbQuery.
- Joins(`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id`).
- Where(`_devlake_pipeline_labels.name = ?`, query.Label)
+ clauses = append(clauses,
+ dal.Join("LEFT JOIN _devlake_pipeline_labels bl ON
bl.pipeline_id = _devlake_pipelines.id"),
+ dal.Where("bl.name = ?", query.Label),
+ )
}
- var count int64
- err := dbQuery.Count(&count).Error
+
+ // count total records
+ count, err := db.Count(clauses...)
if err != nil {
- return nil, 0, errors.Default.Wrap(err, "error getting DB
pipelines count")
+ return nil, 0, err
}
- dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
-
- err = dbQuery.Find(&dbPipelines).Error
+ // load paginated blueprints from database
+ clauses = append(clauses,
+ dal.Orderby("id DESC"),
+ dal.Offset(query.GetSkip()),
+ dal.Limit(query.GetPageSize()),
+ )
+ dbPipelines := make([]*models.DbPipeline, 0)
+ err = db.All(&dbPipelines, clauses...)
if err != nil {
- return nil, count, errors.Default.Wrap(err, "error finding DB
pipelines")
+ return nil, 0, errors.Default.Wrap(err, "error getting DB count
of pipelines")
}
- var pipelineIds []uint64
+ // load labels for blueprints
for _, dbPipeline := range dbPipelines {
- pipelineIds = append(pipelineIds, dbPipeline.ID)
- }
- dbLabels := []models.DbPipelineLabel{}
- db.Where(`pipeline_id in ?`, pipelineIds).Find(&dbLabels)
- dbLabelsMap := map[uint64][]models.DbPipelineLabel{}
- for _, dbLabel := range dbLabels {
- dbLabelsMap[dbLabel.PipelineId] =
append(dbLabelsMap[dbLabel.PipelineId], dbLabel)
- }
- for _, dbPipeline := range dbPipelines {
- dbPipeline.Labels = dbLabelsMap[dbPipeline.ID]
+ err = fillPipelineDetail(dbPipeline)
+ if err != nil {
+ return nil, 0, err
+ }
}
return dbPipelines, count, nil
@@ -178,16 +177,16 @@ func GetDbPipelines(query *PipelineQuery)
([]*models.DbPipeline, int64, errors.E
// GetDbPipeline by id
func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, errors.Error) {
dbPipeline := &models.DbPipeline{}
- err := db.First(dbPipeline, pipelineId).Error
+ err := db.First(dbPipeline, dal.Where("id = ?", pipelineId))
if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("pipeline not found")
}
return nil, errors.Internal.Wrap(err, "error getting the
pipeline from database")
}
- err = db.Find(&dbPipeline.Labels, "pipeline_id = ?", pipelineId).Error
+ err = fillPipelineDetail(dbPipeline)
if err != nil {
- return nil, errors.Internal.Wrap(err, "error getting the
pipeline from database")
+ return nil, err
}
return dbPipeline, nil
}
@@ -248,7 +247,7 @@ func parseDbPipeline(pipeline *models.Pipeline)
*models.DbPipeline {
// encryptDbPipeline encrypts dbPipeline.Plan
func encryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline,
errors.Error) {
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
+ encKey := cfg.GetString(core.EncodeKeyEnvStr)
planEncrypt, err := core.Encrypt(encKey, dbPipeline.Plan)
if err != nil {
return nil, err
@@ -259,7 +258,7 @@ func encryptDbPipeline(dbPipeline *models.DbPipeline)
(*models.DbPipeline, error
// encryptDbPipeline decrypts dbPipeline.Plan
func decryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline,
errors.Error) {
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
+ encKey := cfg.GetString(core.EncodeKeyEnvStr)
plan, err := core.Decrypt(encKey, dbPipeline.Plan)
if err != nil {
return nil, err
@@ -268,11 +267,10 @@ func decryptDbPipeline(dbPipeline *models.DbPipeline)
(*models.DbPipeline, error
return dbPipeline, nil
}
-// UpdateDbPipelineStatus update the status of pipeline
-func UpdateDbPipelineStatus(pipelineId uint64, status string) errors.Error {
- err := db.Model(&models.DbPipeline{}).Where("id = ?",
pipelineId).Update("status", status).Error
+func fillPipelineDetail(pipeline *models.DbPipeline) errors.Error {
+ err := db.All(&pipeline.Labels, dal.Where("pipeline_id = ?",
pipeline.ID))
if err != nil {
- return errors.Convert(err)
+ return errors.Internal.Wrap(err, "error getting the pipeline
labels from database")
}
return nil
}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 812b1123a..4347bdcb1 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -39,7 +39,7 @@ type pipelineRunner struct {
func (p *pipelineRunner) runPipelineStandalone() errors.Error {
return runner.RunPipeline(
- runner.CreateBasicRes(cfg, p.logger, db),
+ basicRes.ReplaceLogger(p.logger),
p.pipeline.ID,
func(taskIds []uint64) errors.Error {
return RunTasksStandalone(p.logger, taskIds)
@@ -132,10 +132,10 @@ func runPipeline(pipelineId uint64) errors.Error {
dbPipeline.Status = models.TASK_COMPLETED
dbPipeline.Message = ""
}
- dbe := db.Model(dbPipeline).Updates(dbPipeline).Error
- if dbe != nil {
- globalPipelineLog.Error(dbe, "update pipeline state failed")
- return errors.Convert(dbe)
+ err = db.Update(dbPipeline)
+ if err != nil {
+ globalPipelineLog.Error(err, "update pipeline state failed")
+ return errors.Convert(err)
}
// notify external webhook
return NotifyExternal(pipelineId)
diff --git a/services/project.go b/services/project.go
index 5d705ced5..0063ec355 100644
--- a/services/project.go
+++ b/services/project.go
@@ -22,286 +22,275 @@ import (
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
- "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
)
// ProjectQuery used to query projects as the api project input
type ProjectQuery struct {
- Page int `form:"page"`
- PageSize int `form:"pageSize"`
+ Pagination
}
-// CreateProject accepts a project instance and insert it to database
-func CreateProject(project *models.Project) errors.Error {
- if project.Name == "" {
- return errors.Default.New("can not use empty name for project")
+// GetProjects returns a paginated list of Projects based on `query`
+func GetProjects(query *ProjectQuery) ([]*models.Project, int64, errors.Error)
{
+ // verify input
+ if err := VerifyStruct(query); err != nil {
+ return nil, 0, err
}
-
- /*project, err := encryptProject(project)
- if err != nil {
- return err
- }*/
- err := CreateDbProject(project)
- if err != nil {
- return err
+ clauses := []dal.Clause{
+ dal.From(&models.Project{}),
}
- return nil
-}
-// CreateProjectMetric accepts a ProjectMetric instance and insert it to
database
-func CreateProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
- /*enProjectMetric, err := encryptProjectMetric(projectMetric)
+ count, err := db.Count(clauses...)
if err != nil {
- return err
- }*/
- err := CreateDbProjectMetric(projectMetric)
- if err != nil {
- return err
+ return nil, 0, errors.Default.Wrap(err, "error getting DB count
of project")
}
- return nil
-}
-// GetProject returns a Project
-func GetProject(name string) (*models.Project, errors.Error) {
- project, err := GetDbProject(name)
+ clauses = append(clauses,
+ dal.Orderby("created_at DESC"),
+ dal.Offset(query.GetSkip()),
+ dal.Limit(query.GetPageSize()),
+ )
+ projects := make([]*models.Project, 0)
+ err = db.All(&projects, clauses...)
if err != nil {
- return nil, errors.Convert(err)
+ return nil, 0, errors.Default.Wrap(err, "error finding DB
project")
}
- /*project, err = decryptProject(project)
- if err != nil {
- return nil, errors.Convert(err)
- }*/
-
- return project, nil
+ return projects, count, nil
}
-// GetProjectMetric returns a ProjectMetric
-func GetProjectMetric(projectName string, pluginName string)
(*models.ProjectMetric, errors.Error) {
- projectMetric, err := GetDbProjectMetric(projectName, pluginName)
- if err != nil {
- return nil, errors.Convert(err)
+// CreateProject accepts a project instance and insert it to database
+func CreateProject(projectInput *models.ApiInputProject)
(*models.ApiOutputProject, errors.Error) {
+ // verify input
+ if err := VerifyStruct(projectInput); err != nil {
+ return nil, err
}
- /*projectMetric, err = decryptProjectMetric(projectMetric)
- if err != nil {
- return nil, errors.Convert(err)
- }*/
-
- return projectMetric, nil
-}
+ // create transaction to updte multiple tables
+ var err errors.Error
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil || err != nil {
+ err = tx.Rollback()
+ if err != nil {
+ log.Error(err, "PatchProject: failed to
rollback")
+ }
+ }
+ }()
-// FlushProjectMetrics remove all Project metrics by project name and create
new metrics by baseMetrics
-func FlushProjectMetrics(projectName string, baseMetrics *[]models.BaseMetric)
errors.Error {
- err := removeAllDbProjectMetricsByProjectName(projectName)
+ // create project first
+ project := &models.Project{}
+ project.BaseProject = projectInput.BaseProject
+ err = db.Create(project)
if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("error to
removeAllDbProjectMetricsByProjectName for %s", projectName))
- }
-
- for _, baseMetric := range *baseMetrics {
- err = CreateProjectMetric(&models.ProjectMetric{
- BaseProjectMetric: models.BaseProjectMetric{
- ProjectName: projectName,
- BaseMetric: baseMetric,
- },
- })
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("failed to
CreateProjectMetric for [%s][%s]", projectName, baseMetric.PluginName))
+ if db.IsDuplicationError(err) {
+ return nil, errors.BadInput.New(fmt.Sprintf("A project
with name [%s] already exists", project.Name))
}
+ return nil, errors.Default.Wrap(err, "error creating DB
project")
}
- return nil
-}
-
-// LoadBluePrintAndMetrics load the blueprint and ProjectMetrics for
projectOutputv
-func LoadBluePrintAndMetrics(projectOutput *models.ApiOutputProject)
errors.Error {
- var err errors.Error
-
- // load Metrics
- projectMetrics, count, err := GetProjectMetrics(projectOutput.Name)
- if err != nil {
- return errors.Default.Wrap(err, "Failed to get project metrics
by project")
- }
- if count == 0 {
- projectOutput.Metrics = nil
- } else {
- baseMetric := make([]models.BaseMetric, len(*projectMetrics))
- for i, projectMetric := range *projectMetrics {
- baseMetric[i] = projectMetric.BaseMetric
+ // check if need flush the Metrics
+ if projectInput.Metrics != nil {
+ err = refreshProjectMetrics(tx, projectInput)
+ if err != nil {
+ return nil, err
}
- projectOutput.Metrics = &baseMetric
}
- // load blueprint
- projectOutput.Blueprint, err =
GetBlueprintByProjectName(projectOutput.Name)
+ // all good, commit transaction
+ err = tx.Commit()
if err != nil {
- return errors.Default.Wrap(err, "Error to get blueprint by
project")
+ return nil, err
}
- return nil
+ return makeProjectOutput(&projectInput.BaseProject)
}
-// GetProjectMetrics returns all ProjectMetric of the project
-func GetProjectMetrics(projectName string) (*[]models.ProjectMetric, int64,
errors.Error) {
- projectMetrics, count, err := GetDbProjectMetrics(projectName)
- if err != nil {
- return nil, 0, errors.Convert(err)
+// GetProject returns a Project
+func GetProject(name string) (*models.ApiOutputProject, errors.Error) {
+ // verify input
+ if name == "" {
+ return nil, errors.BadInput.New("project name is missing")
}
- /*for i, projectMetric := range projectMetrics {
- projectMetrics[i], err = decryptProjectMetric(projectMetric)
- if err != nil {
- return nil, 0, err
- }
- }*/
-
- return projectMetrics, count, nil
-}
-
-// GetProjects returns a paginated list of Projects based on `query`
-func GetProjects(query *ProjectQuery) ([]*models.Project, int64, errors.Error)
{
- projects, count, err := GetDbProjects(query)
+ // load project
+ project := &models.Project{}
+ err := db.First(project, dal.Where("name = ?", name))
if err != nil {
- return nil, 0, errors.Convert(err)
- }
-
- /*for i, project := range projects {
- projects[i], err = decryptProject(project)
- if err != nil {
- return nil, 0, err
+ if db.IsErrorNotFound(err) {
+ return nil, errors.NotFound.Wrap(err,
fmt.Sprintf("could not find project [%s] in DB", name))
}
- }*/
+ return nil, errors.Default.Wrap(err, "error getting project
from DB")
+ }
- return projects, count, nil
+ // convert to api output
+ return makeProjectOutput(&project.BaseProject)
}
// PatchProject FIXME ...
func PatchProject(name string, body map[string]interface{})
(*models.ApiOutputProject, errors.Error) {
projectInput := &models.ApiInputProject{}
- projectOutput := &models.ApiOutputProject{}
- // load record from db
- project, err := GetProject(name)
+ // load input
+ err := helper.DecodeMapStruct(body, projectInput)
if err != nil {
return nil, err
}
- err = helper.DecodeMapStruct(body, projectInput)
+ // wrap all operation inside a transaction
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil || err != nil {
+ err = tx.Rollback()
+ if err != nil {
+ log.Error(err, "PatchProject: failed to
rollback")
+ }
+ }
+ }()
+
+ project := &models.Project{}
+ err = tx.First(project, dal.Where("name = ?", name), dal.Lock(true,
false))
if err != nil {
return nil, err
}
+
// allowed to changed the name
if projectInput.Name == "" {
projectInput.Name = name
}
project.BaseProject = projectInput.BaseProject
- /*enProject, err := encryptProject(project)
- if err != nil {
- return nil, err
- }*/
-
- // check if the name has changed, with the project name changing, we
have to change the other table at the same time as follows
+ // name changed, updates the related entities as well
if name != project.Name {
- //project name
- err = RenameProjectName(name, project.Name)
+ // ProjectMetric
+ err = tx.UpdateColumn(
+ &models.ProjectMetric{},
+ "project_name", project.Name,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
return nil, err
}
- //ProjectMetric
- err = RenameProjectNameForProjectMetric(name, project.Name)
+ // ProjectPrMetric
+ err = tx.UpdateColumn(
+ &crossdomain.ProjectPrMetric{},
+ "project_name", project.Name,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
return nil, err
}
- //ProjectPrMetric
- err = RenameProjectNameForProjectPrMetric(name, project.Name)
+ // ProjectIssueMetric
+ err = tx.UpdateColumn(
+ &crossdomain.ProjectIssueMetric{},
+ "project_name", project.Name,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
return nil, err
}
- //ProjectMapping
- err = RenameProjectNameForProjectIssueMetric(name, project.Name)
+ // ProjectMapping
+ err = tx.UpdateColumn(
+ &crossdomain.ProjectMapping{},
+ "project_name", project.Name,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
return nil, err
}
- //Blueprint
- err = RenameProjectNameForBlueprint(name, project.Name)
+ // Blueprint
+ err = tx.UpdateColumn(
+ &models.DbBlueprint{},
+ "project_name", project.Name,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
return nil, err
}
-
- // rename the project for each plugin
- err = core.TraversalPlugin(func(name string, plugin
core.PluginMeta) errors.Error {
- if handle, ok := plugin.(core.PluginHandle); ok {
- return handle.RenameProjectName(name,
project.Name)
- }
- return nil
- })
- if err != nil {
- return nil, errors.Internal.Wrap(err, "error to rename
project name for plugins")
- }
}
- // save
- err = SaveDbProject(project)
+ // Blueprint
+ err = tx.UpdateColumn(
+ &models.DbBlueprint{},
+ "enable", projectInput.Enable,
+ dal.Where("project_name = ?", name),
+ )
if err != nil {
- return nil, errors.Internal.Wrap(err, "error saving project")
+ return nil, err
}
- // check if need to changed the blueprint setting
- if projectInput.Enable != nil {
- _, err = PatchBlueprintEnableByProjectName(projectInput.Name,
*projectInput.Enable)
+ // refresh project metrics if needed
+ if projectInput.Metrics != nil {
+ err = refreshProjectMetrics(tx, projectInput)
if err != nil {
- return nil, errors.Default.Wrap(err, "Failed to set if
project enable")
+ return nil, err
}
}
- // check if need flush the Metrics
- if projectInput.Metrics != nil {
- err = FlushProjectMetrics(projectInput.Name,
projectInput.Metrics)
- if err != nil {
- return nil, errors.Default.Wrap(err, "Failed to flush
project metrics")
- }
+ // update project itself
+ err = tx.Update(project)
+ if err != nil {
+ return nil, err
}
- projectOutput.BaseProject = projectInput.BaseProject
- err = LoadBluePrintAndMetrics(projectOutput)
+ // commit the transaction
+ err = tx.Commit()
if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("Failed to
LoadBluePrintAndMetrics on PatchProject for %s", projectOutput.Name))
+ return nil, err
}
- // done
- return projectOutput, nil
+ // all good, render output
+ return makeProjectOutput(&projectInput.BaseProject)
}
-// PatchProjectMetric FIXME ...
-func PatchProjectMetric(projectName string, pluginName string, body
map[string]interface{}) (*models.ProjectMetric, errors.Error) {
- // load record from db
- projectMetric, err := GetDbProjectMetric(projectName, pluginName)
+func refreshProjectMetrics(tx dal.Transaction, projectInput
*models.ApiInputProject) errors.Error {
+ err := tx.Delete(&models.ProjectMetric{}, dal.Where("project_name = ?",
projectInput.Name))
if err != nil {
- return nil, err
+ return err
}
- err = helper.DecodeMapStruct(body, projectMetric)
- if err != nil {
- return nil, err
+ for _, baseMetric := range *projectInput.Metrics {
+ err = tx.Create(&models.ProjectMetric{
+ BaseProjectMetric: models.BaseProjectMetric{
+ ProjectName: projectInput.Name,
+ BaseMetric: baseMetric,
+ },
+ })
+ if err != nil {
+ return err
+ }
}
+ return nil
+}
- /*enProjectMetric, err := encryptProjectMetric(projectMetric)
+func makeProjectOutput(baseProject *models.BaseProject)
(*models.ApiOutputProject, errors.Error) {
+ projectOutput := &models.ApiOutputProject{}
+ projectOutput.BaseProject = *baseProject
+ // load project metrics
+ projectMetrics := make([]models.ProjectMetric, 0)
+ err := db.All(&projectMetrics, dal.Where("project_name = ?",
projectOutput.Name))
if err != nil {
- return nil, err
- }*/
+ return nil, errors.Default.Wrap(err, "failed to load project
metrics")
+ }
+ // convert metric to api output
+ if len(projectMetrics) > 0 {
+ baseMetric := make([]models.BaseMetric, len(projectMetrics))
+ for i, projectMetric := range projectMetrics {
+ baseMetric[i] = projectMetric.BaseMetric
+ }
+ projectOutput.Metrics = &baseMetric
+ }
- // save
- err = SaveDbProjectMetric(projectMetric)
+ // load blueprint
+ projectOutput.Blueprint, err =
GetBlueprintByProjectName(projectOutput.Name)
if err != nil {
- return nil, errors.Internal.Wrap(err, "error saving project")
+ return nil, errors.Default.Wrap(err, "Error to get blueprint by
project")
}
-
- // done
- return projectMetric, nil
+ return projectOutput, err
}
diff --git a/services/project_helper.go b/services/project_helper.go
deleted file mode 100644
index 09e2e8d3f..000000000
--- a/services/project_helper.go
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package services
-
-import (
- goerror "errors"
- "fmt"
- "strings"
-
- "github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/models"
- "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
-)
-
-// CreateDbProject accepts a project instance and insert it to database
-func CreateDbProject(project *models.Project) errors.Error {
- err := db.Create(project).Error
- if err != nil {
- if strings.Contains(strings.ToLower(err.Error()), "duplicate") {
- return errors.BadInput.New(fmt.Sprintf("The project
[%s] already exists,cannot be created again", project.Name))
- }
- return errors.Default.Wrap(err, "error creating DB project")
- }
- return nil
-}
-
-// CreateDbProjectMetric accepts a project metric instance and insert it to
database
-func CreateDbProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
- err := db.Create(projectMetric).Error
- if err != nil {
- return errors.Default.Wrap(err, "error creating DB project
metric")
- }
- return nil
-}
-
-// SaveDbProject save a project instance and update it to database
-func SaveDbProject(project *models.Project) errors.Error {
- err := db.Save(project).Error
- if err != nil {
- return errors.Default.Wrap(err, "error saving DB project")
- }
- return nil
-}
-
-// SaveDbProjectMetric save a project instance and update it to database
-func SaveDbProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
- err := db.Save(projectMetric).Error
- if err != nil {
- return errors.Default.Wrap(err, "error saving DB project
metric")
- }
- return nil
-}
-
-// GetDbProjects returns a paginated list of Project based on `query`
-func GetDbProjects(query *ProjectQuery) ([]*models.Project, int64,
errors.Error) {
- projects := make([]*models.Project, 0)
- db := db.Model(projects).Order("created_at desc")
-
- var count int64
- err := db.Count(&count).Error
- if err != nil {
- return nil, 0, errors.Default.Wrap(err, "error getting DB count
of project")
- }
- db = processDbClausesWithPager(db, query.PageSize, query.Page)
-
- err = db.Find(&projects).Error
- if err != nil {
- return nil, 0, errors.Default.Wrap(err, "error finding DB
project")
- }
-
- return projects, count, nil
-}
-
-// GetDbProject returns the detail of a given project name
-func GetDbProject(name string) (*models.Project, errors.Error) {
- project := &models.Project{}
- project.Name = name
-
- err := db.First(project).Error
- if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
- return nil, errors.NotFound.Wrap(err,
fmt.Sprintf("could not find project [%s] in DB", name))
- }
- return nil, errors.Default.Wrap(err, "error getting project
from DB")
- }
-
- return project, nil
-}
-
-// GetDbProjectMetric returns the detail of a given project name
-func GetDbProjectMetric(projectName string, pluginName string)
(*models.ProjectMetric, errors.Error) {
- projectMetric := &models.ProjectMetric{}
- projectMetric.ProjectName = projectName
- projectMetric.PluginName = pluginName
-
- err := db.First(projectMetric).Error
- if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
- return nil, errors.NotFound.Wrap(err,
fmt.Sprintf("could not find project metric [%s][%s] in DB", projectName,
pluginName))
- }
- return nil, errors.Default.Wrap(err, "error getting project
metric from DB")
- }
-
- return projectMetric, nil
-}
-
-// GetDbProjectMetrics returns all of Metrics of a given project name
-func GetDbProjectMetrics(projectName string) (*[]models.ProjectMetric, int64,
errors.Error) {
- projectMetrics := make([]models.ProjectMetric, 0)
- db := db.Table(models.ProjectMetric{}.TableName()).Where("project_name
= ?", projectName)
-
- var count int64
- err := db.Count(&count).Error
- if err != nil {
- return nil, 0, errors.Default.Wrap(err, fmt.Sprintf("could not
get project metric count for projectName [%s] in DB", projectName))
- }
-
- if count == 0 {
- return nil, 0, nil
- }
-
- err = db.Find(&projectMetrics).Error
- if err != nil {
- return nil, 0, errors.Default.Wrap(err, fmt.Sprintf("could not
find project metric for projectName [%s] in DB", projectName))
- }
-
- return &projectMetrics, count, nil
-}
-
-// RenameProjectName FIXME ...
-func RenameProjectName(oldProjectName string, newProjectName string)
errors.Error {
- err := db.Exec(
- "UPDATE ? SET name = ? WHERE name = ?",
- clause.Table{Name: models.Project{}.TableName()},
- newProjectName,
- oldProjectName,
- ).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForProjectMetric from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-// RenameProjectNameForProjectMetric FIXME ...
-func RenameProjectNameForProjectMetric(oldProjectName string, newProjectName
string) errors.Error {
- err := db.Model(&models.ProjectMetric{}).
- Where("project_name = ?", oldProjectName).
- Updates(map[string]interface{}{
- "project_name": newProjectName,
- }).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForProjectMetric from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-// RenameProjectNameForProjectPrMetric FIXME ...
-func RenameProjectNameForProjectPrMetric(oldProjectName string, newProjectName
string) errors.Error {
- err := db.Model(&crossdomain.ProjectPrMetric{}).
- Where("project_name = ?", oldProjectName).
- Updates(map[string]interface{}{
- "project_name": newProjectName,
- }).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForProjectPrMetric from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-// RenameProjectNameForProjectIssueMetric FIXME ...
-func RenameProjectNameForProjectIssueMetric(oldProjectName string,
newProjectName string) errors.Error {
- err := db.Model(&crossdomain.ProjectIssueMetric{}).
- Where("project_name = ?", oldProjectName).
- Updates(map[string]interface{}{
- "project_name": newProjectName,
- }).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForProjectIssueMetric from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-// RenameProjectNameForProjectMapping FIXME ...
-func RenameProjectNameForProjectMapping(oldProjectName string, newProjectName
string) errors.Error {
- err := db.Model(&crossdomain.ProjectMapping{}).
- Where("project_name = ?", oldProjectName).
- Updates(map[string]interface{}{
- "project_name": newProjectName,
- }).Error
- if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("Failed to
RenameProjectNameForProjectMapping from [%s] to [%s]", oldProjectName,
newProjectName))
- }
-
- return nil
-}
-
-func removeAllDbProjectMetricsByProjectName(projectName string) errors.Error {
- err := db.Delete(&models.ProjectMetric{}, "project_name = ?",
projectName).Error
- if err != nil {
- return errors.Default.Wrap(err, "error deleting ProjectMetrics
from DB")
- }
- return nil
-}
-
-// encryptProject
-/*func encryptProject(project *models.Project) (*models.Project, errors.Error)
{
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
- describeEncrypt, err := core.Encrypt(encKey, project.Description)
- if err != nil {
- return nil, err
- }
- project.Description = describeEncrypt
-
- return project, nil
-}
-
-// encryptProjectMetric
-func encryptProjectMetric(projectMetric *models.ProjectMetric)
(*models.ProjectMetric, errors.Error) {
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
- pluginOption, err := core.Encrypt(encKey, projectMetric.PluginOption)
- if err != nil {
- return nil, err
- }
- projectMetric.PluginOption = pluginOption
-
- return projectMetric, nil
-}*/
-
-// decryptProject
-/*func decryptProject(project *models.Project) (*models.Project, errors.Error)
{
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
- describe, err := core.Decrypt(encKey, project.Description)
- if err != nil {
- return nil, err
- }
- project.Description = describe
-
- return project, nil
-}
-
-// decryptProjectMetric
-func decryptProjectMetric(projectMetric *models.ProjectMetric)
(*models.ProjectMetric, errors.Error) {
- encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
- pluginOption, err := core.Decrypt(encKey, projectMetric.PluginOption)
- if err != nil {
- return nil, err
- }
- projectMetric.PluginOption = pluginOption
-
- return projectMetric, nil
-}*/
diff --git a/services/pushapi.go b/services/pushapi.go
index 85e850190..1269bc46a 100644
--- a/services/pushapi.go
+++ b/services/pushapi.go
@@ -17,13 +17,16 @@ limitations under the License.
package services
-import "github.com/apache/incubator-devlake/errors"
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+)
// InsertRow FIXME ...
func InsertRow(table string, rows []map[string]interface{}) (int64,
errors.Error) {
- tx := db.Table(table).Create(rows)
- if tx.Error != nil {
- return 0, errors.Convert(tx.Error)
+ err := db.Create(rows, dal.From(table))
+ if err != nil {
+ return 0, err
}
- return tx.RowsAffected, nil
+ return 1, nil
}
diff --git a/services/task.go b/services/task.go
index 5484d48af..0630e0ddc 100644
--- a/services/task.go
+++ b/services/task.go
@@ -18,123 +18,31 @@ limitations under the License.
package services
import (
- "context"
"encoding/json"
- goerror "errors"
"fmt"
"regexp"
- "strconv"
"strings"
- "sync"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
- "github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/runner"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
var taskLog = logger.Global.Nested("task service")
var activityPattern = regexp.MustCompile(`task #(\d+)`)
-// RunningTaskData FIXME ...
-type RunningTaskData struct {
- Cancel context.CancelFunc
- ProgressDetail *models.TaskProgressDetail
-}
-
-// RunningTask FIXME ...
-type RunningTask struct {
- mu sync.Mutex
- tasks map[uint64]*RunningTaskData
-}
-
-// Add FIXME ...
-func (rt *RunningTask) Add(taskId uint64, cancel context.CancelFunc)
errors.Error {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- if _, ok := rt.tasks[taskId]; ok {
- return errors.Default.New(fmt.Sprintf("task with id %d already
running", taskId))
- }
- rt.tasks[taskId] = &RunningTaskData{
- Cancel: cancel,
- ProgressDetail: &models.TaskProgressDetail{},
- }
- return nil
-}
-
-func (rt *RunningTask) setAll(progressDetails
map[uint64]*models.TaskProgressDetail) {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- // delete finished tasks
- for taskId := range rt.tasks {
- if _, ok := progressDetails[taskId]; !ok {
- delete(rt.tasks, taskId)
- }
- }
- for taskId, progressDetail := range progressDetails {
- if _, ok := rt.tasks[taskId]; !ok {
- rt.tasks[taskId] = &RunningTaskData{}
- }
- rt.tasks[taskId].ProgressDetail = progressDetail
- }
-}
-
-// FillProgressDetailToTasks lock less times than GetProgressDetail
-func (rt *RunningTask) FillProgressDetailToTasks(tasks []models.Task) {
- rt.mu.Lock()
- defer rt.mu.Unlock()
-
- for index, task := range tasks {
- taskId := task.ID
- if task, ok := rt.tasks[taskId]; ok {
- tasks[index].ProgressDetail = task.ProgressDetail
- }
- }
-}
-
-// GetProgressDetail FIXME ...
-func (rt *RunningTask) GetProgressDetail(taskId uint64)
*models.TaskProgressDetail {
- rt.mu.Lock()
- defer rt.mu.Unlock()
-
- if task, ok := rt.tasks[taskId]; ok {
- return task.ProgressDetail
- }
- return nil
-}
-
-// Remove FIXME ...
-func (rt *RunningTask) Remove(taskId uint64) (context.CancelFunc,
errors.Error) {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- if d, ok := rt.tasks[taskId]; ok {
- delete(rt.tasks, taskId)
- return d.Cancel, nil
- }
- return nil, errors.NotFound.New(fmt.Sprintf("task with id %d not
found", taskId))
-}
-
-var runningTasks RunningTask
-
-// TaskQuery FIXME ...
+// TaskQuery FIXME .
type TaskQuery struct {
+ Pagination
Status string `form:"status"`
- Page int `form:"page"`
- PageSize int `form:"page_size"`
Plugin string `form:"plugin"`
PipelineId uint64 `form:"pipelineId" uri:"pipelineId"`
Pending int `form:"pending"`
}
-func init() {
- // set all previous unfinished tasks to status failed
- runningTasks.tasks = make(map[uint64]*RunningTaskData)
-}
-
-// CreateTask FIXME ...
+// CreateTask creates a new task
func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
b, err := json.Marshal(newTask.Options)
if err != nil {
@@ -145,7 +53,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task,
errors.Error) {
return nil, errors.Convert(err)
}
- task := models.Task{
+ task := &models.Task{
Plugin: newTask.Plugin,
Subtasks: s,
Options: b,
@@ -155,56 +63,72 @@ func CreateTask(newTask *models.NewTask) (*models.Task,
errors.Error) {
PipelineRow: newTask.PipelineRow,
PipelineCol: newTask.PipelineCol,
}
- err = db.Save(&task).Error
+ if newTask.IsRerun {
+ task.Status = models.TASK_RERUN
+ }
+ err = db.Create(task)
if err != nil {
taskLog.Error(err, "save task failed")
return nil, errors.Internal.Wrap(err, "save task failed")
}
- return &task, nil
+ return task, nil
}
-// GetTasks FIXME ...
-func GetTasks(query *TaskQuery) ([]models.Task, int64, errors.Error) {
- db := db.Model(&models.Task{}).Order("id DESC")
+// GetTasks returns paginated tasks that match the given query
+func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
+ // verify query
+ if err := VerifyStruct(query); err != nil {
+ return nil, 0, err
+ }
+
+ // construct common query clauses
+ clauses := []dal.Clause{dal.From(&models.Task{})}
if query.Status != "" {
- db = db.Where("status = ?", query.Status)
+ clauses = append(clauses, dal.Where("status = ?", query.Status))
}
if query.Plugin != "" {
- db = db.Where("plugin = ?", query.Plugin)
+ clauses = append(clauses, dal.Where("plugin = ?", query.Plugin))
}
if query.PipelineId > 0 {
- db = db.Where("pipeline_id = ?", query.PipelineId)
+ clauses = append(clauses, dal.Where("pipeline_id = ?",
query.PipelineId))
}
if query.Pending > 0 {
- db = db.Where("finished_at is null")
+ clauses = append(clauses, dal.Where("finished_at is null"))
}
- var count int64
- err := db.Count(&count).Error
+
+ // count total records
+ count, err := db.Count(clauses...)
if err != nil {
- return nil, 0, errors.Convert(err)
+ return nil, 0, err
}
- tasks := make([]models.Task, 0)
- err = db.Find(&tasks).Error
+ // load paginated records from db
+ clauses = append(clauses,
+ dal.Orderby("id DESC"),
+ dal.Offset(query.GetSkip()),
+ dal.Limit(query.GetPageSizeOr(10000)),
+ )
+ tasks := make([]*models.Task, 0)
+ err = db.All(&tasks, clauses...)
if err != nil {
- return nil, count, errors.Convert(err)
+ return nil, count, err
}
+ // fill running information
runningTasks.FillProgressDetailToTasks(tasks)
return tasks, count, nil
}
// GetTasksWithLastStatus returns task list of the pipeline, only the most
recently tasks would be returned
-func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
- var tasks []models.Task
- dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id
= ?", pipelineId)
- err := dbInner.Find(&tasks).Error
+func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) {
+ var tasks []*models.Task
+ err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId),
dal.Orderby("id DESC"))
if err != nil {
- return nil, errors.Convert(err)
+ return nil, err
}
taskIds := make(map[int64]struct{})
- var result []models.Task
+ var result []*models.Task
var maxRow, maxCol int
for _, task := range tasks {
if task.PipelineRow > maxRow {
@@ -225,35 +149,12 @@ func GetTasksWithLastStatus(pipelineId uint64)
([]models.Task, errors.Error) {
return result, nil
}
-// SpawnTasks create new tasks from the failed ones
-func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
- var result []models.Task
- for _, task := range input {
- task.Model = common.Model{}
- task.Status = models.TASK_CREATED
- task.Message = ""
- task.Progress = 0
- task.ProgressDetail = nil
- task.FailedSubTask = ""
- task.BeganAt = nil
- task.FinishedAt = nil
- task.SpentSeconds = 0
- result = append(result, task)
- }
- err := db.Save(&result).Error
- if err != nil {
- taskLog.Error(err, "save task failed")
- return nil, errors.Internal.Wrap(err, "save task failed")
- }
- return result, nil
-}
-
// GetTask FIXME ...
func GetTask(taskId uint64) (*models.Task, errors.Error) {
task := &models.Task{}
- err := db.First(task, taskId).Error
+ err := db.First(task, dal.Where("id = ?", taskId))
if err != nil {
- if goerror.Is(err, gorm.ErrRecordNotFound) {
+ if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("task not found")
}
return nil, errors.Internal.Wrap(err, "error getting the task
from database")
@@ -312,63 +213,15 @@ func RunTasksStandalone(parentLogger core.Logger, taskIds
[]uint64) errors.Error
return errors.Convert(err)
}
-func runTaskStandalone(parentLog core.Logger, taskId uint64) errors.Error {
- // deferring cleaning up
- defer func() {
- _, _ = runningTasks.Remove(taskId)
- }()
- // for task cancelling
- ctx, cancel := context.WithCancel(context.Background())
- err := runningTasks.Add(taskId, cancel)
+// RerunTask reruns specified task
+func RerunTask(taskId uint64) (*models.Task, errors.Error) {
+ task, err := GetTask(taskId)
if err != nil {
- return err
- }
- // now , create a progress update channel and kick off
- progress := make(chan core.RunningProgress, 100)
- go updateTaskProgress(taskId, progress)
- err = runner.RunTask(
- ctx,
- runner.CreateBasicRes(cfg, parentLog, db),
- progress,
- taskId,
- )
- close(progress)
- return err
-}
-
-func getRunningTaskById(taskId uint64) *RunningTaskData {
- runningTasks.mu.Lock()
- defer runningTasks.mu.Unlock()
-
- return runningTasks.tasks[taskId]
-}
-
-func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
- data := getRunningTaskById(taskId)
- if data == nil {
- return
+ return nil, err
}
- progressDetail := data.ProgressDetail
- for p := range progress {
- runningTasks.mu.Lock()
- runner.UpdateProgressDetail(basicRes, taskId, progressDetail,
&p)
- runningTasks.mu.Unlock()
- }
-}
-
-func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
- submatches := activityPattern.FindStringSubmatch(activityId)
- if len(submatches) < 2 {
- return 0, errors.Default.New("activityId does not match")
- }
- return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
-}
-
-// DeleteCreatedTasks deletes tasks with status `TASK_CREATED`
-func DeleteCreatedTasks(pipelineId uint64) errors.Error {
- err := db.Where("pipeline_id = ? AND status = ?", pipelineId,
models.TASK_CREATED).Delete(&models.Task{}).Error
+ rerunTasks, err := RerunPipeline(task.PipelineId, task)
if err != nil {
- return errors.Convert(err)
+ return nil, err
}
- return nil
+ return rerunTasks[0], nil
}
diff --git a/services/task_runner.go b/services/task_runner.go
new file mode 100644
index 000000000..373734d81
--- /dev/null
+++ b/services/task_runner.go
@@ -0,0 +1,167 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/runner"
+)
+
+// RunningTaskData FIXME ...
+type RunningTaskData struct {
+ Cancel context.CancelFunc
+ ProgressDetail *models.TaskProgressDetail
+}
+
+// RunningTask FIXME ...
+type RunningTask struct {
+ mu sync.Mutex
+ tasks map[uint64]*RunningTaskData
+}
+
+// Add FIXME ...
+func (rt *RunningTask) Add(taskId uint64, cancel context.CancelFunc)
errors.Error {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ if _, ok := rt.tasks[taskId]; ok {
+ return errors.Default.New(fmt.Sprintf("task with id %d already
running", taskId))
+ }
+ rt.tasks[taskId] = &RunningTaskData{
+ Cancel: cancel,
+ ProgressDetail: &models.TaskProgressDetail{},
+ }
+ return nil
+}
+
+func (rt *RunningTask) setAll(progressDetails
map[uint64]*models.TaskProgressDetail) {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ // delete finished tasks
+ for taskId := range rt.tasks {
+ if _, ok := progressDetails[taskId]; !ok {
+ delete(rt.tasks, taskId)
+ }
+ }
+ for taskId, progressDetail := range progressDetails {
+ if _, ok := rt.tasks[taskId]; !ok {
+ rt.tasks[taskId] = &RunningTaskData{}
+ }
+ rt.tasks[taskId].ProgressDetail = progressDetail
+ }
+}
+
+// FillProgressDetailToTasks lock less times than GetProgressDetail
+func (rt *RunningTask) FillProgressDetailToTasks(tasks []*models.Task) {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+
+ for index, task := range tasks {
+ taskId := task.ID
+ if task, ok := rt.tasks[taskId]; ok {
+ tasks[index].ProgressDetail = task.ProgressDetail
+ }
+ }
+}
+
+// GetProgressDetail FIXME ...
+func (rt *RunningTask) GetProgressDetail(taskId uint64)
*models.TaskProgressDetail {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+
+ if task, ok := rt.tasks[taskId]; ok {
+ return task.ProgressDetail
+ }
+ return nil
+}
+
+// Remove FIXME ...
+func (rt *RunningTask) Remove(taskId uint64) (context.CancelFunc,
errors.Error) {
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ if d, ok := rt.tasks[taskId]; ok {
+ delete(rt.tasks, taskId)
+ return d.Cancel, nil
+ }
+ return nil, errors.NotFound.New(fmt.Sprintf("task with id %d not
found", taskId))
+}
+
+var runningTasks RunningTask
+
+func init() {
+ // set all previous unfinished tasks to status failed
+ runningTasks.tasks = make(map[uint64]*RunningTaskData)
+}
+
+func runTaskStandalone(parentLog core.Logger, taskId uint64) errors.Error {
+ // deferring cleaning up
+ defer func() {
+ _, _ = runningTasks.Remove(taskId)
+ }()
+ // for task cancelling
+ ctx, cancel := context.WithCancel(context.Background())
+ err := runningTasks.Add(taskId, cancel)
+ if err != nil {
+ return err
+ }
+ // now , create a progress update channel and kick off
+ progress := make(chan core.RunningProgress, 100)
+ go updateTaskProgress(taskId, progress)
+ err = runner.RunTask(
+ ctx,
+ basicRes.ReplaceLogger(parentLog),
+ progress,
+ taskId,
+ )
+ close(progress)
+ return err
+}
+
+func getRunningTaskById(taskId uint64) *RunningTaskData {
+ runningTasks.mu.Lock()
+ defer runningTasks.mu.Unlock()
+
+ return runningTasks.tasks[taskId]
+}
+
+func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
+ data := getRunningTaskById(taskId)
+ if data == nil {
+ return
+ }
+ progressDetail := data.ProgressDetail
+ for p := range progress {
+ runningTasks.mu.Lock()
+ runner.UpdateProgressDetail(basicRes, taskId, progressDetail,
&p)
+ runningTasks.mu.Unlock()
+ }
+}
+
+func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
+ submatches := activityPattern.FindStringSubmatch(activityId)
+ if len(submatches) < 2 {
+ return 0, errors.Default.New("activityId does not match")
+ }
+ return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
+}