This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 12ac469f7 replace gorm with dal from services module (#3986)
12ac469f7 is described below

commit 12ac469f7a96dd79e0b63276dfc0e8cc220bac0e
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Dec 21 11:02:52 2022 +0800

    replace gorm with dal from services module (#3986)
    
    * refactor: add ConfigReader to BasicRes interface
    
    * refactor: use ConfigReader in `services`
    
    * refactor: replace grom with dal from service mod
    
    * fix: rerun sawllowing errors
    
    * fix: cancelled pipelines turned to failed after restart
    
    * fix: listing pipelines api not working
    
    * fix: listing projects api not working
    
    * fix: keep tasks statuses as is for diagnose
    
    * fix: filter by label
    
    * fix: make rerun great again
    
    * fix: bp label filer
    
    * fix: patch project enable
    
    * feat: re-organise debugging scripts
    
    * refactor: remove gorm
    
    * fix: linting errors
    
    * fix: for cr
---
 api/blueprints/blueprints.go                       |  40 +--
 api/pipelines/pipelines.go                         |  69 ++--
 api/project/project.go                             | 159 +---------
 api/router.go                                      |   9 +-
 api/task/task.go                                   |  89 +-----
 impl/dalgorm/dalgorm.go                            |   5 +
 models/project.go                                  |   4 +-
 models/task.go                                     |   1 +
 plugins/core/dal/dal.go                            |   4 +-
 plugins/core/plugin_handle.go                      |  28 --
 runner/run_pipeline.go                             |   2 +-
 scripts/pm/framework/api-test.sh                   |  60 ++++
 ...ine-get-detail.sh => blueprint-v200-trigger.sh} |   4 +-
 scripts/pm/framework/pipeline-cancel.sh            |   2 +-
 scripts/pm/framework/pipeline-get-detail.sh        |   2 +-
 .../{pipeline-get-detail.sh => pipeline-rerun.sh}  |   4 +-
 .../{pipeline-get-detail.sh => task-rerun.sh}      |   4 +-
 .../{create-blueprint.sh => blueprint-create.sh}   |   0
 ...-blueprint-v200.sh => blueprint-v200-create.sh} |   4 +-
 scripts/pm/github/{user.sh => gh-user-get.sh}      |   0
 .../{trigger-pipline.sh => pipeline-create.sh}     |   4 +-
 scripts/pm/github/{put-repo.sh => repo-create.sh}  |   0
 ...ationrules.sh => transformationrules-create.sh} |   0
 services/base.go                                   |  61 ++++
 services/blueprint.go                              |  52 +--
 services/blueprint_helper.go                       | 108 +++----
 services/blueprint_makeplan_v200.go                |  14 +-
 services/dm_code.go                                |  14 +-
 services/init.go                                   |  33 +-
 services/notification.go                           |   7 +-
 services/page_helper.go                            |  35 ---
 services/pipeline.go                               | 184 +++++++++--
 services/pipeline_helper.go                        |  90 +++---
 services/pipeline_runner.go                        |  10 +-
 services/project.go                                | 349 ++++++++++-----------
 services/project_helper.go                         | 276 ----------------
 services/pushapi.go                                |  13 +-
 services/task.go                                   | 249 +++------------
 services/task_runner.go                            | 167 ++++++++++
 39 files changed, 892 insertions(+), 1264 deletions(-)

diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index 40cbeecb3..3581d326c 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -111,26 +111,26 @@ func Get(c *gin.Context) {
        shared.ApiOutputSuccess(c, blueprint, http.StatusOK)
 }
 
-// @Summary delete blueprints
-// @Description Delete BluePrints
-// @Tags framework/blueprints
-// @Param blueprintId path string true "blueprintId"
-// @Success 200
-// @Failure 400  {object} shared.ApiBody "Bad Request"
-// @Failure 500  {object} shared.ApiBody "Internal Error"
-// @Router /blueprints/{blueprintId} [delete]
-func Delete(c *gin.Context) {
-       pipelineId := c.Param("blueprintId")
-       id, err := strconv.ParseUint(pipelineId, 10, 64)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad 
blueprintID format supplied"))
-               return
-       }
-       err = services.DeleteBlueprint(id)
-       if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
deleting blueprint"))
-       }
-}
+// // @Summary delete blueprints
+// // @Description Delete BluePrints
+// // @Tags framework/blueprints
+// // @Param blueprintId path string true "blueprintId"
+// // @Success 200
+// // @Failure 400  {object} shared.ApiBody "Bad Request"
+// // @Failure 500  {object} shared.ApiBody "Internal Error"
+// // @Router /blueprints/{blueprintId} [delete]
+// func Delete(c *gin.Context) {
+//     pipelineId := c.Param("blueprintId")
+//     id, err := strconv.ParseUint(pipelineId, 10, 64)
+//     if err != nil {
+//             shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad 
blueprintID format supplied"))
+//             return
+//     }
+//     err = services.DeleteBlueprint(id)
+//     if err != nil {
+//             shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
deleting blueprint"))
+//     }
+// }
 
 // @Summary patch blueprints
 // @Description patch blueprints
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index c35ab5d19..b020e3f53 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -31,17 +31,6 @@ import (
        "github.com/gin-gonic/gin/binding"
 )
 
-/*
-Create and run a new pipeline
-POST /pipelines
-{
-       "name": "name-of-pipeline",
-       "tasks": [
-               [ {"plugin": "gitlab", ...}, {"plugin": "jira"} ],
-               [ {"plugin": "github", ...}],
-       ]
-}
-*/
 // @Summary Create and run a new pipeline
 // @Description Create and run a new pipeline
 // @Tags framework/pipelines
@@ -69,17 +58,6 @@ func Post(c *gin.Context) {
        shared.ApiOutputSuccess(c, pipeline, http.StatusCreated)
 }
 
-/*
-Get list of pipelines
-GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
-{
-       "pipelines": [
-               {"id": 1, "name": "test-pipeline", ...}
-       ],
-       "count": 5
-}
-*/
-
 // @Summary Get list of pipelines
 // @Description GET 
/pipelines?status=TASK_RUNNING&pending=1&label=search_text&page=1&pagesize=10
 // @Tags framework/pipelines
@@ -108,16 +86,7 @@ func Index(c *gin.Context) {
        shared.ApiOutputSuccess(c, shared.ResponsePipelines{Pipelines: 
pipelines, Count: count}, http.StatusOK)
 }
 
-/*
-Get detail of a pipeline
-GET /pipelines/:pipelineId
-{
-       "id": 1,
-       "name": "test-pipeline",
-       ...
-}
-*/
-// @Get detail of a pipeline
+// @Summary Get detail of a pipeline
 // @Description GET /pipelines/:pipelineId
 // @Description RETURN SAMPLE
 // @Description {
@@ -146,11 +115,7 @@ func Get(c *gin.Context) {
        shared.ApiOutputSuccess(c, pipeline, http.StatusOK)
 }
 
-/*
-Cancel a pending pipeline
-DELETE /pipelines/:pipelineId
-*/
-// @Cancel a pending pipeline
+// @Summary Cancel a pending pipeline
 // @Description Cancel a pending pipeline
 // @Tags framework/pipelines
 // @Param pipelineId path int true "pipeline ID"
@@ -173,11 +138,7 @@ func Delete(c *gin.Context) {
        shared.ApiOutputSuccess(c, nil, http.StatusOK)
 }
 
-/*
-Get download logs of a pipeline
-GET /pipelines/:pipelineId/logging.tar.gz
-*/
-// download logs of a pipeline
+// @Summary download logs of a pipeline
 // @Description GET /pipelines/:pipelineId/logging.tar.gz
 // @Tags framework/pipelines
 // @Param pipelineId path int true "query"
@@ -206,3 +167,27 @@ func DownloadLogs(c *gin.Context) {
        defer os.Remove(archive)
        c.FileAttachment(archive, filepath.Base(archive))
 }
+
+// RerunPipeline rerun all failed tasks of the specified pipeline
+// @Summary rerun tasks
+// @Tags framework/pipeline
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Success 200  {object} []models.Task
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /pipeline/{pipelineId}/rerun [post]
+func PostRerun(c *gin.Context) {
+       pipelineId := c.Param("pipelineId")
+       id, err := strconv.ParseUint(pipelineId, 10, 64)
+       if err != nil {
+               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad 
pipelineID format supplied"))
+               return
+       }
+       rerunTasks, err := services.RerunPipeline(id, nil)
+       if err != nil {
+               shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to 
rerun pipeline"))
+               return
+       }
+       shared.ApiOutputSuccess(c, rerunTasks, http.StatusOK)
+}
diff --git a/api/project/project.go b/api/project/project.go
index 786d8a75f..22e25b524 100644
--- a/api/project/project.go
+++ b/api/project/project.go
@@ -18,7 +18,6 @@ limitations under the License.
 package project
 
 import (
-       "fmt"
        "net/http"
 
        "github.com/apache/incubator-devlake/api/shared"
@@ -29,8 +28,8 @@ import (
 )
 
 type PaginatedProjects struct {
-       Projects []*models.BaseProject `json:"projects"`
-       Count    int64                 `json:"count"`
+       Projects []*models.Project `json:"projects"`
+       Count    int64             `json:"count"`
 }
 
 // @Summary Create and run a new project
@@ -43,22 +42,13 @@ type PaginatedProjects struct {
 // @Failure 500  {string} errcode.Error "Internal Error"
 // @Router /projects/:projectName [get]
 func GetProject(c *gin.Context) {
-       projectOutput := &models.ApiOutputProject{}
        projectName := c.Param("projectName")
 
-       project, err := services.GetProject(projectName)
+       projectOutput, err := services.GetProject(projectName)
        if err != nil {
                shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
getting project"))
                return
        }
-
-       projectOutput.BaseProject = project.BaseProject
-       err = services.LoadBluePrintAndMetrics(projectOutput)
-       if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, 
fmt.Sprintf("Failed to LoadBluePrintAndMetrics on GetProject for %s", 
projectOutput.Name)))
-               return
-       }
-
        shared.ApiOutputSuccess(c, projectOutput, http.StatusOK)
 }
 
@@ -84,13 +74,8 @@ func GetProjects(c *gin.Context) {
                return
        }
 
-       baseProjects := make([]*models.BaseProject, count)
-       for i, project := range projects {
-               baseProjects[i] = &project.BaseProject
-       }
-
        shared.ApiOutputSuccess(c, PaginatedProjects{
-               Projects: baseProjects,
+               Projects: projects,
                Count:    count,
        }, http.StatusOK)
 }
@@ -106,45 +91,18 @@ func GetProjects(c *gin.Context) {
 // @Router /projects [post]
 func PostProject(c *gin.Context) {
        projectInput := &models.ApiInputProject{}
-       projectOutput := &models.ApiOutputProject{}
-
        err := c.ShouldBind(projectInput)
        if err != nil {
                shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
shared.BadRequestBody))
                return
        }
 
-       err = services.CreateProject(&models.Project{BaseProject: 
projectInput.BaseProject})
+       projectOutput, err := services.CreateProject(projectInput)
        if err != nil {
                shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error 
creating project"))
                return
        }
 
-       // check if need to changed the blueprint setting
-       if projectInput.Enable != nil {
-               _, err = 
services.PatchBlueprintEnableByProjectName(projectInput.Name, 
*projectInput.Enable)
-               if err != nil {
-                       shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
"Failed to set if project enable"))
-                       return
-               }
-       }
-
-       // check if need flush the Metrics
-       if projectInput.Metrics != nil {
-               err = services.FlushProjectMetrics(projectInput.Name, 
projectInput.Metrics)
-               if err != nil {
-                       shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
"Failed to flush project metrics"))
-                       return
-               }
-       }
-
-       projectOutput.BaseProject = projectInput.BaseProject
-       err = services.LoadBluePrintAndMetrics(projectOutput)
-       if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, 
fmt.Sprintf("Failed to LoadBluePrintAndMetrics on PostProject for %s", 
projectOutput.Name)))
-               return
-       }
-
        shared.ApiOutputSuccess(c, projectOutput, http.StatusCreated)
 }
 
@@ -175,110 +133,3 @@ func PatchProject(c *gin.Context) {
 
        shared.ApiOutputSuccess(c, projectOutput, http.StatusCreated)
 }
-
-// @Cancel a project
-// @Description Cancel a project
-// @Tags framework/projects
-// @Success 200
-// @Failure 400  {string} er2rcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
-// @Router /projects/:projectName [delete]
-//func DeleteProject(c *gin.Context) {
-//}
-
-// @Summary Get a ProjectMetrics
-// @Description Get a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Param projectName path string true "project name"
-// @Param pluginName path string true "plugin name"
-// @Success 200  {object} models.BaseProjectMetric
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
-// @Router /projects/:projectName/metrics/:pluginName [get]
-func GetProjectMetrics(c *gin.Context) {
-       projectName := c.Param("projectName")
-       pluginName := c.Param("pluginName")
-
-       projectMetric, err := services.GetProjectMetric(projectName, pluginName)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error 
getting project metric"))
-               return
-       }
-
-       shared.ApiOutputSuccess(c, projectMetric.BaseProjectMetric, 
http.StatusOK)
-}
-
-// @Summary Create a new ProjectMetrics
-// @Description Create  a new ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Accept application/json
-// @Param project body models.BaseProjectMetric true "json"
-// @Success 200  {object} models.BaseProjectMetric
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internal Error"
-// @Router /projects/:projectName/metrics [post]
-func PostProjectMetrics(c *gin.Context) {
-       projectMetric := &models.BaseProjectMetric{}
-
-       projectName := c.Param("projectName")
-
-       _, err1 := services.GetProject(projectName)
-       if err1 != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err1, 
shared.BadRequestBody))
-               return
-       }
-
-       err := c.ShouldBind(projectMetric)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
shared.BadRequestBody))
-               return
-       }
-
-       projectMetric.ProjectName = projectName
-       err = 
services.CreateProjectMetric(&models.ProjectMetric{BaseProjectMetric: 
*projectMetric})
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error 
creating project metric"))
-               return
-       }
-
-       shared.ApiOutputSuccess(c, projectMetric, http.StatusCreated)
-}
-
-// @Summary Patch a ProjectMetrics
-// @Description Patch a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Accept application/json
-// @Param ProjectMetrics body models.BaseProjectMetric true "json"
-// @Success 200  {object} models.BaseProjectMetric
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internal Error"
-// @Router /projects/:projectName/metrics/:pluginName  [patch]
-func PatchProjectMetrics(c *gin.Context) {
-       projectName := c.Param("projectName")
-       pluginName := c.Param("pluginName")
-
-       var body map[string]interface{}
-       err := c.ShouldBind(&body)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
shared.BadRequestBody))
-               return
-       }
-
-       projectMetric, err := services.PatchProjectMetric(projectName, 
pluginName, body)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error patch 
project metric"))
-               return
-       }
-
-       shared.ApiOutputSuccess(c, projectMetric.BaseProjectMetric, 
http.StatusCreated)
-}
-
-// @delete a ProjectMetrics
-// @Description delete a ProjectMetrics
-// @Tags framework/ProjectMetrics
-// @Success 200
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
-// @Router /project_metrics/:projectName/:pluginName [delete]
-//func DeleteProjectMetrics(c *gin.Context) {
-//}
diff --git a/api/router.go b/api/router.go
index 1c50f6236..d7311c4fd 100644
--- a/api/router.go
+++ b/api/router.go
@@ -51,7 +51,8 @@ func RegisterRouter(r *gin.Engine) {
        r.GET("/blueprints/:blueprintId/pipelines", 
blueprints.GetBlueprintPipelines)
        r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
        r.GET("/pipelines/:pipelineId/tasks", task.GetTaskByPipeline)
-       r.POST("/pipelines/:pipelineId/tasks", task.RerunTask)
+       r.POST("/pipelines/:pipelineId/rerun", pipelines.PostRerun)
+       r.POST("/tasks/:taskId/rerun", task.PostRerun)
 
        r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
 
@@ -71,12 +72,6 @@ func RegisterRouter(r *gin.Engine) {
        r.POST("/projects", project.PostProject)
        r.GET("/projects", project.GetProjects)
 
-       // project metric api
-       r.GET("/projects/:projectName/metrics/:pluginName", 
project.GetProjectMetrics)
-       r.PATCH("/projects/:projectName/metrics/:pluginName", 
project.PatchProjectMetrics)
-       //r.DELETE("/projects/:projectName/metrics/:pluginName", 
project.DeleteProjectMetrics)
-       r.POST("/projects/:projectName/metrics", project.PostProjectMetrics)
-
        // mount all api resources for all plugins
        pluginsApiResources, err := services.GetPluginsApiResources()
        if err != nil {
diff --git a/api/task/task.go b/api/task/task.go
index 17bee977a..83dd9cb83 100644
--- a/api/task/task.go
+++ b/api/task/task.go
@@ -44,8 +44,8 @@ func Delete(c *gin.Context) {
 }
 
 type getTaskResponse struct {
-       Tasks []models.Task `json:"tasks"`
-       Count int           `json:"count"`
+       Tasks []*models.Task `json:"tasks"`
+       Count int            `json:"count"`
 }
 
 // GetTaskByPipeline return most recent tasks
@@ -71,88 +71,25 @@ func GetTaskByPipeline(c *gin.Context) {
        shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count: 
len(tasks)}, http.StatusOK)
 }
 
-type rerunRequest struct {
-       TaskId uint64 `json:"taskId"`
-}
-
-// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of 
this pipeline will rerun
-// @Summary rerun tasks
+// RerunTask rerun the specified task.
+// @Summary rerun task
 // @Tags framework/task
 // @Accept application/json
-// @Param pipelineId path int true "pipelineId"
-// @Param request body rerunRequest false "specify the task to rerun. If it's 
0, all failed tasks of this pipeline will rerun"
-// @Success 200  {object} shared.ApiBody
+// @Success 200  {object} models.Task
 // @Failure 400  {object} shared.ApiBody "Bad Request"
 // @Failure 500  {object} shared.ApiBody "Internal Error"
-// @Router /pipelines/{pipelineId}/tasks [post]
-func RerunTask(c *gin.Context) {
-       var request rerunRequest
-       err := c.BindJSON(&request)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
task ID format"))
-               return
-       }
-       pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
pipeline ID format"))
-               return
-       }
-       pipeline, err := services.GetPipeline(pipelineId)
-       if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error get 
pipeline"))
-               return
-       }
-       if pipeline.Status == models.TASK_RUNNING {
-               shared.ApiOutputError(c, errors.BadInput.New("pipeline is 
running"))
-               return
-       }
-       if pipeline.Status == models.TASK_CREATED || pipeline.Status == 
models.TASK_RERUN {
-               shared.ApiOutputError(c, errors.BadInput.New("pipeline is 
waiting to run"))
-               return
-       }
-
-       var failedTasks []models.Task
-       if request.TaskId > 0 {
-               failedTask, err := services.GetTask(request.TaskId)
-               if err != nil || failedTask == nil {
-                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
"error getting failed task"))
-                       return
-               }
-               if failedTask.PipelineId != pipelineId {
-                       shared.ApiOutputError(c, errors.BadInput.New("the task 
ID and pipeline ID doesn't match"))
-                       return
-               }
-               failedTasks = append(failedTasks, *failedTask)
-       } else {
-               tasks, err := services.GetTasksWithLastStatus(pipelineId)
-               if err != nil {
-                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
"error getting tasks"))
-                       return
-               }
-               for _, task := range tasks {
-                       if task.Status == models.TASK_FAILED {
-                               failedTasks = append(failedTasks, task)
-                       }
-               }
-       }
-       if len(failedTasks) == 0 {
-               shared.ApiOutputSuccess(c, nil, http.StatusOK)
-               return
-       }
-       err = services.DeleteCreatedTasks(pipelineId)
-       if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error delete 
tasks"))
-               return
-       }
-       _, err = services.SpawnTasks(failedTasks)
+// @Router /tasks/{taskId}/rerun [post]
+func PostRerun(c *gin.Context) {
+       taskId := c.Param("taskId")
+       id, err := strconv.ParseUint(taskId, 10, 64)
        if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error create 
tasks"))
+               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad taskId 
format supplied"))
                return
        }
-       err = services.UpdateDbPipelineStatus(pipelineId, models.TASK_RERUN)
+       task, err := services.RerunTask(id)
        if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error create 
tasks"))
+               shared.ApiOutputError(c, err)
                return
        }
-       shared.ApiOutputSuccess(c, nil, http.StatusOK)
+       shared.ApiOutputSuccess(c, task, http.StatusOK)
 }
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index b239456fb..a9a2e8403 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -360,6 +360,11 @@ func (d *Dalgorm) IsErrorNotFound(err errors.Error) bool {
        return errors.Is(err, gorm.ErrRecordNotFound)
 }
 
+// IsDuplicationError checking if the sql error is not found.
+func (d *Dalgorm) IsDuplicationError(err errors.Error) bool {
+       return strings.Contains(err.Error(), "duplicate")
+}
+
 // NewDalgorm creates a *Dalgorm
 func NewDalgorm(db *gorm.DB) *Dalgorm {
        return &Dalgorm{db}
diff --git a/models/project.go b/models/project.go
index d3596b6a8..2b7c4ab22 100644
--- a/models/project.go
+++ b/models/project.go
@@ -20,7 +20,7 @@ package models
 import "github.com/apache/incubator-devlake/models/common"
 
 type BaseProject struct {
-       Name        string `json:"name" mapstructure:"name" 
gorm:"primaryKey;type:varchar(255)"`
+       Name        string `json:"name" mapstructure:"name" 
gorm:"primaryKey;type:varchar(255)" validate:"required"`
        Description string `json:"description" mapstructure:"description" 
gorm:"type:text"`
 }
 
@@ -34,7 +34,7 @@ func (Project) TableName() string {
 }
 
 type BaseMetric struct {
-       PluginName   string `json:"pluginName" mapstructure:"pluginName" 
gorm:"primaryKey;type:varchar(255)"`
+       PluginName   string `json:"pluginName" mapstructure:"pluginName" 
gorm:"primaryKey;type:varchar(255)" validate:"required"`
        PluginOption string `json:"pluginOption" mapstructure:"pluginOption" 
gorm:"type:text"`
        Enable       bool   `json:"enable" mapstructure:"enable" 
gorm:"type:boolean"`
 }
diff --git a/models/task.go b/models/task.go
index bfc9d8ab7..7f09c3437 100644
--- a/models/task.go
+++ b/models/task.go
@@ -72,6 +72,7 @@ type NewTask struct {
        PipelineId  uint64 `json:"-"`
        PipelineRow int    `json:"-"`
        PipelineCol int    `json:"-"`
+       IsRerun     bool   `json:"-"`
 }
 
 type Subtask struct {
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 46077b753..90f546645 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -140,8 +140,10 @@ type Dal interface {
        Session(config SessionConfig) Dal
        // Begin create a new transaction
        Begin() Transaction
-       // checking if the sql error is not found.
+       // IsErrorNotFound returns true if error is record-not-found
        IsErrorNotFound(err errors.Error) bool
+       // IsDuplicationError returns true if error is duplicate-error
+       IsDuplicationError(err errors.Error) bool
 }
 
 type Transaction interface {
diff --git a/plugins/core/plugin_handle.go b/plugins/core/plugin_handle.go
deleted file mode 100644
index 3bbe371be..000000000
--- a/plugins/core/plugin_handle.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package core
-
-import (
-       "github.com/apache/incubator-devlake/errors"
-)
-
-type PluginHandle interface {
-       // When the projectName in the framework layer is changed
-       // this interface will be checked and this method will be called to 
synchronize the projectName of each plugin
-       RenameProjectName(oldName string, newName string) errors.Error
-}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index c964ad27c..7c0d36cc0 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -37,7 +37,7 @@ func RunPipeline(
        var tasks []models.Task
        err := db.All(
                &tasks,
-               dal.Where("pipeline_id = ? AND status = ?", pipelineId, 
models.TASK_CREATED),
+               dal.Where("pipeline_id = ? AND status in ?", pipelineId, 
[]string{models.TASK_CREATED, models.TASK_RERUN}),
                dal.Orderby("pipeline_row, pipeline_col"),
        )
        if err != nil {
diff --git a/scripts/pm/framework/api-test.sh b/scripts/pm/framework/api-test.sh
new file mode 100755
index 000000000..758aa937c
--- /dev/null
+++ b/scripts/pm/framework/api-test.sh
@@ -0,0 +1,60 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. "$(dirname $0)/../vars/active-vars.sh"
+
+request() {
+  response="$(curl -s "$LAKE_ENDPOINT/$1" | jq --color-output)"
+  echo "
+  INDEX: $2 URL: $LAKE_ENDPOINT/$1
+  $response
+  " | less --RAW-CONTROL-CHARS
+  read PAUSE
+}
+
+start=$1
+length=$2
+counter=0
+for url in \
+"pipelines" \
+"pipelines?pageSize=1&page=1" \
+"pipelines?pageSize=1&page=2" \
+"pipelines?blueprint_id=1" \
+"pipelines?status=TASK_FAILED" \
+"pipelines?pending=1" \
+"pipelines?label=foobar" \
+"blueprints" \
+"blueprints?pageSize=1&page=1" \
+"blueprints?pageSize=1&page=2" \
+"blueprints?enable=1" \
+"blueprints?is_manual=0" \
+"blueprints?label=foobar" \
+; do
+  counter=$(($counter+1))
+  if [ "$start" = "-l" ]; then
+    printf "#%-3s %s\n" "$counter" "$url"
+    continue
+  fi
+  if [ -n "$start" ] && [ "$counter" -lt "$start" ]; then
+    continue
+  fi
+  if [ -n "$length" ] && [ "$counter" -ge $(($start+$length)) ]; then
+    continue
+  fi
+  request "$url" $counter
+done
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-get-detail.sh 
b/scripts/pm/framework/blueprint-v200-trigger.sh
similarity index 90%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/blueprint-v200-trigger.sh
index 7153c67a2..910e2b7ea 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/blueprint-v200-trigger.sh
@@ -18,6 +18,6 @@
 
 . "$(dirname $0)/../vars/active-vars.sh"
 
-pipeline_id=${1-"2"}
+blueprint_id=${1-"3"}
 
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/blueprints/$blueprint_id/trigger | jq
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-cancel.sh 
b/scripts/pm/framework/pipeline-cancel.sh
index 1c1f1bdb4..e62c74534 100755
--- a/scripts/pm/framework/pipeline-cancel.sh
+++ b/scripts/pm/framework/pipeline-cancel.sh
@@ -17,6 +17,6 @@
 #
 
 . "$(dirname $0)/../vars/active-vars.sh"
-ID=${1-17}
+ID=${1-32}
 
 curl -sv -XDELETE $LAKE_ENDPOINT/pipelines/$ID | jq
diff --git a/scripts/pm/framework/pipeline-get-detail.sh 
b/scripts/pm/framework/pipeline-get-detail.sh
index 7153c67a2..081006e37 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/pipeline-get-detail.sh
@@ -18,6 +18,6 @@
 
 . "$(dirname $0)/../vars/active-vars.sh"
 
-pipeline_id=${1-"2"}
+pipeline_id=${1-"31"}
 
 curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
diff --git a/scripts/pm/framework/pipeline-get-detail.sh 
b/scripts/pm/framework/pipeline-rerun.sh
similarity index 90%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/pipeline-rerun.sh
index 7153c67a2..808d1d346 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/pipeline-rerun.sh
@@ -18,6 +18,6 @@
 
 . "$(dirname $0)/../vars/active-vars.sh"
 
-pipeline_id=${1-"2"}
+pipeline_id=${1-42}
 
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/pipelines/$pipeline_id/rerun | jq
\ No newline at end of file
diff --git a/scripts/pm/framework/pipeline-get-detail.sh 
b/scripts/pm/framework/task-rerun.sh
similarity index 91%
copy from scripts/pm/framework/pipeline-get-detail.sh
copy to scripts/pm/framework/task-rerun.sh
index 7153c67a2..1a565af42 100755
--- a/scripts/pm/framework/pipeline-get-detail.sh
+++ b/scripts/pm/framework/task-rerun.sh
@@ -18,6 +18,6 @@
 
 . "$(dirname $0)/../vars/active-vars.sh"
 
-pipeline_id=${1-"2"}
+task_id=${1-63}
 
-curl -sv $LAKE_ENDPOINT/pipelines/$pipeline_id | jq
+curl -sv -XPOST $LAKE_ENDPOINT/tasks/$task_id/rerun | jq
\ No newline at end of file
diff --git a/scripts/pm/github/create-blueprint.sh 
b/scripts/pm/github/blueprint-create.sh
similarity index 100%
rename from scripts/pm/github/create-blueprint.sh
rename to scripts/pm/github/blueprint-create.sh
diff --git a/scripts/pm/github/create-blueprint-v200.sh 
b/scripts/pm/github/blueprint-v200-create.sh
similarity index 97%
rename from scripts/pm/github/create-blueprint-v200.sh
rename to scripts/pm/github/blueprint-v200-create.sh
index 8745c3720..ad8b8d843 100755
--- a/scripts/pm/github/create-blueprint-v200.sh
+++ b/scripts/pm/github/blueprint-v200-create.sh
@@ -29,7 +29,6 @@ curl -sv $LAKE_ENDPOINT/blueprints \
        "isManual": true,
        "mode": "NORMAL",
        "name": "My GitHub Blueprint",
-    "projectName": "$PROJECT_NAME",
     "settings": {
         "version": "2.0.0",
         "skipOnFail": false,
@@ -45,6 +44,7 @@ curl -sv $LAKE_ENDPOINT/blueprints \
                 ]
             }
         ]
-    }
+    },
+    "labels": ["foobar"]
 }
 JSON
diff --git a/scripts/pm/github/user.sh b/scripts/pm/github/gh-user-get.sh
similarity index 100%
rename from scripts/pm/github/user.sh
rename to scripts/pm/github/gh-user-get.sh
diff --git a/scripts/pm/github/trigger-pipline.sh 
b/scripts/pm/github/pipeline-create.sh
similarity index 93%
rename from scripts/pm/github/trigger-pipline.sh
rename to scripts/pm/github/pipeline-create.sh
index 85965697a..900ea38df 100755
--- a/scripts/pm/github/trigger-pipline.sh
+++ b/scripts/pm/github/pipeline-create.sh
@@ -25,6 +25,7 @@ curl -sv $LAKE_ENDPOINT/pipelines --data @- <<JSON | jq
         [
             {
                 "plugin": "github",
+                "subtasks": ["collectApiIssues"],
                 "options": {
                     "connectionId": 1,
                     "owner": "apache",
@@ -32,6 +33,7 @@ curl -sv $LAKE_ENDPOINT/pipelines --data @- <<JSON | jq
                 }
             }
         ]
-    ]
+    ],
+    "labels": [ "foobar" ]
 }
 JSON
diff --git a/scripts/pm/github/put-repo.sh b/scripts/pm/github/repo-create.sh
similarity index 100%
rename from scripts/pm/github/put-repo.sh
rename to scripts/pm/github/repo-create.sh
diff --git a/scripts/pm/github/create-transformationrules.sh 
b/scripts/pm/github/transformationrules-create.sh
similarity index 100%
rename from scripts/pm/github/create-transformationrules.sh
rename to scripts/pm/github/transformationrules-create.sh
diff --git a/services/base.go b/services/base.go
new file mode 100644
index 000000000..07e3a3f4c
--- /dev/null
+++ b/services/base.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import "github.com/apache/incubator-devlake/errors"
+
+// Pagination holds the paginate information
+type Pagination struct {
+       Page     int `form:"page"`
+       PageSize int `form:"pageSize"`
+}
+
+// GetPage returns current page number
+func (p *Pagination) GetPage() int {
+       if p.Page < 1 {
+               return 1
+       }
+       return p.Page
+}
+
+// GetPageSize returns a sensible page size based on input
+func (p *Pagination) GetPageSize() int {
+       return p.GetPageSizeOr(50)
+}
+
+// GetPageSizeOr returns the page size or fallback to `defaultVal`
+func (p *Pagination) GetPageSizeOr(defaultVal int) int {
+       if p.PageSize < 1 {
+               return defaultVal
+       }
+       return p.PageSize
+}
+
+// GetSkip returns how many records  should be skipped for specified page
+func (p *Pagination) GetSkip() int {
+       return (p.GetPage() - 1) * p.GetPageSize()
+}
+
+// VerifyStruct verifies given struct with `validator`
+func VerifyStruct(v interface{}) errors.Error {
+       err := vld.Struct(v)
+       if err != nil {
+               return errors.BadInput.Wrap(err, "data verification failed")
+       }
+       return nil
+}
diff --git a/services/blueprint.go b/services/blueprint.go
index c1f5c8491..5b287d268 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -19,7 +19,6 @@ package services
 
 import (
        "encoding/json"
-       goerror "errors"
        "fmt"
        "strings"
 
@@ -27,24 +26,21 @@ import (
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/go-playground/validator/v10"
        "github.com/robfig/cron/v3"
-       "gorm.io/gorm"
 )
 
 // BlueprintQuery is a query for GetBlueprints
 type BlueprintQuery struct {
+       Pagination
        Enable   *bool  `form:"enable,omitempty"`
        IsManual *bool  `form:"is_manual"`
-       Page     int    `form:"page"`
-       PageSize int    `form:"pageSize"`
        Label    string `form:"label"`
 }
 
 var (
        blueprintLog = logger.Global.Nested("blueprint")
-       vld          = validator.New()
 )
 
 // CreateBlueprint accepts a Blueprint instance and insert it to database
@@ -92,7 +88,7 @@ func GetBlueprints(query *BlueprintQuery) 
([]*models.Blueprint, int64, errors.Er
 func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) {
        dbBlueprint, err := GetDbBlueprint(blueprintId)
        if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, errors.NotFound.New("blueprint not found")
                }
                return nil, errors.Internal.Wrap(err, "error getting the 
blueprint from database")
@@ -113,7 +109,7 @@ func GetBlueprintByProjectName(projectName string) 
(*models.Blueprint, errors.Er
        dbBlueprint, err := GetDbBlueprintByProjectName(projectName)
        if err != nil {
                // Allow specific projectName to fail to find the corresponding 
blueprint
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, nil
                }
                return nil, errors.Internal.Wrap(err, fmt.Sprintf("error 
getting the blueprint from database with project %s", projectName))
@@ -211,27 +207,6 @@ func saveBlueprint(blueprint *models.Blueprint) 
(*models.Blueprint, errors.Error
        return blueprint, nil
 }
 
-// PatchBlueprintEnableByProjectName FIXME ...
-func PatchBlueprintEnableByProjectName(projectName string, enable bool) 
(*models.Blueprint, errors.Error) {
-       blueprint, err := GetBlueprintByProjectName(projectName)
-       if err != nil {
-               return nil, err
-       }
-
-       if blueprint == nil {
-               return nil, errors.Default.New(fmt.Sprintf("do not surpport to 
set enable for projectName:[%s] ,because it has no blueprint.", projectName))
-       }
-
-       blueprint.Enable = enable
-
-       blueprint, err = saveBlueprint(blueprint)
-       if err != nil {
-               return nil, err
-       }
-
-       return blueprint, nil
-}
-
 // PatchBlueprint FIXME ...
 func PatchBlueprint(id uint64, body map[string]interface{}) 
(*models.Blueprint, errors.Error) {
        // load record from db
@@ -258,19 +233,6 @@ func PatchBlueprint(id uint64, body 
map[string]interface{}) (*models.Blueprint,
        return blueprint, nil
 }
 
-// DeleteBlueprint FIXME ...
-func DeleteBlueprint(id uint64) errors.Error {
-       err := DeleteDbBlueprint(id)
-       if err != nil {
-               return errors.Internal.Wrap(err, fmt.Sprintf("error deleting 
blueprint %d", id))
-       }
-       err = ReloadBlueprints(cronManager)
-       if err != nil {
-               return errors.Internal.Wrap(err, "error reloading blueprints")
-       }
-       return nil
-}
-
 // ReloadBlueprints FIXME ...
 func ReloadBlueprints(c *cron.Cron) errors.Error {
        enable := true
@@ -358,9 +320,11 @@ func MakePlanForBlueprint(blueprint *models.Blueprint) 
(core.PipelinePlan, error
                // load project metric plugins and convert it to a map
                metrics := make(map[string]json.RawMessage)
                projectMetrics := make([]models.ProjectMetric, 0)
-
                if blueprint.ProjectName != "" {
-                       db.Find(&projectMetrics, "project_name = ? AND enable = 
?", blueprint.ProjectName, true)
+                       err = db.All(&projectMetrics, dal.Where("project_name = 
? AND enable = ?", blueprint.ProjectName, true))
+                       if err != nil {
+                               return nil, err
+                       }
                        for _, projectMetric := range projectMetrics {
                                metrics[projectMetric.PluginName] = 
json.RawMessage(projectMetric.PluginOption)
                        }
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 1d8c07067..facd5f7b3 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -18,28 +18,27 @@ limitations under the License.
 package services
 
 import (
-       goerror "errors"
        "fmt"
 
        "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 // SaveDbBlueprint accepts a Blueprint instance and upsert it to database
 func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
        var err error
        if dbBlueprint.ID != 0 {
-               err = db.Save(&dbBlueprint).Error
+               err = db.Update(&dbBlueprint)
        } else {
-               err = db.Create(&dbBlueprint).Error
+               err = db.Create(&dbBlueprint)
        }
        if err != nil {
                return errors.Default.Wrap(err, "error creating DB blueprint")
        }
-       err = db.Delete(&models.DbBlueprintLabel{}, `blueprint_id = ?`, 
dbBlueprint.ID).Error
+       err = db.Delete(&models.DbBlueprintLabel{}, dal.Where(`blueprint_id = 
?`, dbBlueprint.ID))
        if err != nil {
                return errors.Default.Wrap(err, "error delete DB blueprint's 
old labelModels")
        }
@@ -47,7 +46,7 @@ func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) 
errors.Error {
                for i := range dbBlueprint.Labels {
                        dbBlueprint.Labels[i].BlueprintId = dbBlueprint.ID
                }
-               err = db.Create(&dbBlueprint.Labels).Error
+               err = db.Create(&dbBlueprint.Labels)
                if err != nil {
                        return errors.Default.Wrap(err, "error creating DB 
blueprint's labelModels")
                }
@@ -57,45 +56,45 @@ func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) 
errors.Error {
 
 // GetDbBlueprints returns a paginated list of Blueprints based on `query`
 func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, 
errors.Error) {
-       dbBlueprints := make([]*models.DbBlueprint, 0)
-       dbQuery := db.Model(dbBlueprints).Order("id DESC")
+       // process query parameters
+       clauses := []dal.Clause{dal.From(&models.DbBlueprint{})}
        if query.Enable != nil {
-               dbQuery = dbQuery.Where("enable = ?", *query.Enable)
+               clauses = append(clauses, dal.Where("enable = ?", 
*query.Enable))
        }
        if query.IsManual != nil {
-               dbQuery = dbQuery.Where("is_manual = ?", *query.IsManual)
+               clauses = append(clauses, dal.Where("is_manual = ?", 
*query.IsManual))
        }
        if query.Label != "" {
-               dbQuery = dbQuery.
-                       Joins(`left join _devlake_blueprint_labels ON 
_devlake_blueprint_labels.blueprint_id = _devlake_blueprints.id`).
-                       Where(`_devlake_blueprint_labels.name = ?`, query.Label)
+               clauses = append(clauses,
+                       dal.Join("left join _devlake_blueprint_labels bl ON 
bl.blueprint_id = _devlake_blueprints.id"),
+                       dal.Where("bl.name = ?", query.Label),
+               )
        }
 
-       var count int64
-       err := dbQuery.Count(&count).Error
+       // count total records
+       count, err := db.Count(clauses...)
        if err != nil {
-               return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of blueprints")
+               return nil, 0, err
        }
 
-       dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
-
-       err = dbQuery.Find(&dbBlueprints).Error
+       // load paginated blueprints from database
+       clauses = append(clauses,
+               dal.Orderby("id DESC"),
+               dal.Offset(query.GetSkip()),
+               dal.Limit(query.GetPageSize()),
+       )
+       dbBlueprints := make([]*models.DbBlueprint, 0)
+       err = db.All(&dbBlueprints, clauses...)
        if err != nil {
-               return nil, 0, errors.Default.Wrap(err, "error finding DB 
blueprints")
+               return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of blueprints")
        }
 
-       var blueprintIds []uint64
-       for _, dbBlueprint := range dbBlueprints {
-               blueprintIds = append(blueprintIds, dbBlueprint.ID)
-       }
-       var dbLabels []models.DbBlueprintLabel
-       dbLabelsMap := map[uint64][]models.DbBlueprintLabel{}
-       db.Where(`blueprint_id in ?`, blueprintIds).Find(&dbLabels)
-       for _, dbLabel := range dbLabels {
-               dbLabelsMap[dbLabel.BlueprintId] = 
append(dbLabelsMap[dbLabel.BlueprintId], dbLabel)
-       }
+       // load labels for blueprints
        for _, dbBlueprint := range dbBlueprints {
-               dbBlueprint.Labels = dbLabelsMap[dbBlueprint.ID]
+               err = fillBlueprintDetail(dbBlueprint)
+               if err != nil {
+                       return nil, 0, err
+               }
        }
 
        return dbBlueprints, count, nil
@@ -104,16 +103,16 @@ func GetDbBlueprints(query *BlueprintQuery) 
([]*models.DbBlueprint, int64, error
 // GetDbBlueprint returns the detail of a given Blueprint ID
 func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, errors.Error) {
        dbBlueprint := &models.DbBlueprint{}
-       err := db.First(dbBlueprint, dbBlueprintId).Error
+       err := db.First(dbBlueprint, dal.Where("id = ?", dbBlueprintId))
        if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, errors.NotFound.Wrap(err, "could not find 
blueprint in DB")
                }
                return nil, errors.Default.Wrap(err, "error getting blueprint 
from DB")
        }
-       err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?", 
dbBlueprint.ID).Error
+       err = fillBlueprintDetail(dbBlueprint)
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error getting the 
blueprint labels from database")
+               return nil, err
        }
        return dbBlueprint, nil
 }
@@ -121,43 +120,20 @@ func GetDbBlueprint(dbBlueprintId uint64) 
(*models.DbBlueprint, errors.Error) {
 // GetDbBlueprintByProjectName returns the detail of a given projectName
 func GetDbBlueprintByProjectName(projectName string) (*models.DbBlueprint, 
errors.Error) {
        dbBlueprint := &models.DbBlueprint{}
-       err := db.Where("project_name = ?", 
projectName).First(dbBlueprint).Error
+       err := db.First(dbBlueprint, dal.Where("project_name = ?", projectName))
        if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, errors.NotFound.Wrap(err, 
fmt.Sprintf("could not find blueprint in DB by projectName %s", projectName))
                }
                return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting 
blueprint from DB by projectName %s", projectName))
        }
-       err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?", 
dbBlueprint.ID).Error
+       err = fillBlueprintDetail(dbBlueprint)
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error getting the 
blueprint labels from database")
+               return nil, err
        }
        return dbBlueprint, nil
 }
 
-// RenameProjectNameForBlueprint FIXME ...
-func RenameProjectNameForBlueprint(oldProjectName string, newProjectName 
string) errors.Error {
-       err := db.Model(&models.DbBlueprint{}).
-               Where("project_name = ?", oldProjectName).
-               Updates(map[string]interface{}{
-                       "project_name": newProjectName,
-               }).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForBlueprint from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-// DeleteDbBlueprint deletes blueprint by id
-func DeleteDbBlueprint(id uint64) errors.Error {
-       err := db.Delete(&models.DbBlueprint{}, "id = ?", id).Error
-       if err != nil {
-               return errors.Default.Wrap(err, "error deleting blueprint from 
DB")
-       }
-       return nil
-}
-
 // parseBlueprint
 func parseBlueprint(dbBlueprint *models.DbBlueprint) *models.Blueprint {
        labelList := []string{}
@@ -236,3 +212,11 @@ func decryptDbBlueprint(dbBlueprint *models.DbBlueprint) 
(*models.DbBlueprint, e
        }
        return dbBlueprint, nil
 }
+
+func fillBlueprintDetail(blueprint *models.DbBlueprint) errors.Error {
+       err := db.All(&blueprint.Labels, dal.Where("blueprint_id = ?", 
blueprint.ID))
+       if err != nil {
+               return errors.Internal.Wrap(err, "error getting the blueprint 
labels from database")
+       }
+       return nil
+}
diff --git a/services/blueprint_makeplan_v200.go 
b/services/blueprint_makeplan_v200.go
index 904d6ef6f..eed7b53a2 100644
--- a/services/blueprint_makeplan_v200.go
+++ b/services/blueprint_makeplan_v200.go
@@ -20,6 +20,7 @@ package services
 import (
        "encoding/json"
        "fmt"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
@@ -39,18 +40,19 @@ func GeneratePlanJsonV200(
        if err != nil {
                return nil, err
        }
-       // refresh project_mapping table to reflect project/scopes relationship
+       // save scopes to database
        if len(scopes) > 0 {
                for _, scope := range scopes {
-                       e := basicRes.GetDal().CreateOrUpdate(scope)
-                       if e != nil {
+                       err = db.CreateOrUpdate(scope)
+                       if err != nil {
                                scopeInfo := 
fmt.Sprintf("[Id:%s][Name:%s][TableName:%s]", scope.ScopeId(), 
scope.ScopeName(), scope.TableName())
-                               return nil, errors.Default.Wrap(e, 
fmt.Sprintf("failed to create scopes:[%s]", scopeInfo))
+                               return nil, errors.Default.Wrap(err, 
fmt.Sprintf("failed to create scopes:[%s]", scopeInfo))
                        }
                }
        }
+       // refresh project_mapping table to reflect project/scopes relationship
        if len(projectName) != 0 {
-               err = basicRes.GetDal().Delete(&crossdomain.ProjectMapping{}, 
dal.Where("project_name = ?", projectName))
+               err = db.Delete(&crossdomain.ProjectMapping{}, 
dal.Where("project_name = ?", projectName))
                if err != nil {
                        return nil, err
                }
@@ -60,7 +62,7 @@ func GeneratePlanJsonV200(
                                Table:       scope.TableName(),
                                RowId:       scope.ScopeId(),
                        }
-                       err = basicRes.GetDal().CreateIfNotExist(projectMapping)
+                       err = db.Create(projectMapping)
                        if err != nil {
                                return nil, err
                        }
diff --git a/services/dm_code.go b/services/dm_code.go
index 78540d365..b747b0c4a 100644
--- a/services/dm_code.go
+++ b/services/dm_code.go
@@ -20,20 +20,12 @@ package services
 import (
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/domainlayer/code"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 // GetRepos FIXME ...
 func GetRepos() ([]*code.Repo, int64, errors.Error) {
        repos := make([]*code.Repo, 0)
-       db := db.Model(repos).Order("id DESC")
-       var count int64
-       err := db.Count(&count).Error
-       if err != nil {
-               return nil, 0, errors.Convert(err)
-       }
-       err = db.Find(&repos).Error
-       if err != nil {
-               return nil, count, errors.Convert(err)
-       }
-       return repos, count, nil
+       err := db.All(&repos, dal.Orderby("id DESC"))
+       return repos, int64(len(repos)), err
 }
diff --git a/services/init.go b/services/init.go
index 12f7bece3..c51808cce 100644
--- a/services/init.go
+++ b/services/init.go
@@ -23,8 +23,8 @@ import (
        "sync"
 
        "github.com/apache/incubator-devlake/errors"
+       "github.com/go-playground/validator/v10"
 
-       "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models/migrationscripts"
@@ -32,38 +32,33 @@ import (
        "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/runner"
        "github.com/robfig/cron/v3"
-       "gorm.io/gorm"
 )
 
 var cfg core.ConfigReader
 var log core.Logger
-var db *gorm.DB
+var db dal.Dal
 var basicRes core.BasicRes
 var migrator core.Migrator
 var cronManager *cron.Cron
 var cronLocker sync.Mutex
+var vld *validator.Validate
 
 const failToCreateCronJob = "created cron job failed"
 
 // Init the services module
 func Init() {
        var err error
-       cfg = config.GetConfig()
-       log = logger.Global
-       db, err = runner.NewGormDb(cfg, log)
-       if err != nil {
-               panic(err)
-       }
 
-       // TODO: this is ugly, the lockDb / CreateAppBasicRes are coupled via 
global variables cfg/log
-       // it is too much for this refactor, let's solve it later
+       // basic resources initialization
+       vld = validator.New()
+       basicRes = runner.CreateAppBasicRes()
+       cfg = basicRes.GetConfigReader()
+       log = basicRes.GetLogger()
+       db = basicRes.GetDal()
 
        // lock the database to avoid multiple devlake instances from sharing 
the same one
        lockDb()
 
-       // basic resources initialization
-       basicRes = runner.CreateBasicRes(cfg, log, db)
-
        // initialize db migrator
        migrator, err = runner.InitMigrator(basicRes)
        if err != nil {
@@ -72,17 +67,20 @@ func Init() {
        log.Info("migration initialized")
        migrator.Register(migrationscripts.All(), "Framework")
 
-       // now,
-       // load plugins
+       // now, load the plugins
        err = runner.LoadPlugins(basicRes)
        if err != nil {
                panic(err)
        }
+
+       // pull migration scripts from plugins to migrator
        for pluginName, pluginInst := range core.AllPlugins() {
                if migratable, ok := pluginInst.(core.PluginMigration); ok {
                        migrator.Register(migratable.MigrationScripts(), 
pluginName)
                }
        }
+
+       // check if there are pending migration
        forceMigration := cfg.GetBool("FORCE_MIGRATION")
        if !migrator.HasPendingScripts() || forceMigration {
                err = ExecuteMigration()
@@ -107,7 +105,8 @@ func ExecuteMigration() errors.Error {
        if err != nil {
                panic(err)
        }
-       // call service init
+
+       // initialize pipeline server, mainly to start the pipeline consuming 
process
        pipelineServiceInit()
        return nil
 }
diff --git a/services/notification.go b/services/notification.go
index a067a9e36..5c1c6495b 100644
--- a/services/notification.go
+++ b/services/notification.go
@@ -22,13 +22,14 @@ import (
        "encoding/hex"
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/errors"
        "io"
        "math/rand"
        "net/http"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/models"
 )
 
@@ -78,7 +79,7 @@ func (n *NotificationService) 
sendNotification(notificationType models.Notificat
        nonce := randSeq(16)
        notification.Nonce = nonce
 
-       err = db.Save(&notification).Error
+       err = db.Create(&notification)
        if err != nil {
                return errors.Convert(err)
        }
@@ -97,7 +98,7 @@ func (n *NotificationService) 
sendNotification(notificationType models.Notificat
                return errors.Convert(err)
        }
        notification.Response = string(respBody)
-       return errors.Convert(db.Save(&notification).Error)
+       return db.Update(notification)
 }
 
 func (n *NotificationService) signature(input, nouce string) string {
diff --git a/services/page_helper.go b/services/page_helper.go
deleted file mode 100644
index 6c80705ba..000000000
--- a/services/page_helper.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package services
-
-import "gorm.io/gorm"
-
-const maxPageSize = 100
-
-func processDbClausesWithPager(tx *gorm.DB, pageSize int, page int) *gorm.DB {
-       if pageSize <= 0 || pageSize > maxPageSize {
-               pageSize = maxPageSize
-       }
-       tx = tx.Limit(pageSize)
-
-       if page > 0 {
-               offset := pageSize * (page - 1)
-               tx = tx.Offset(offset)
-       }
-       return tx
-}
diff --git a/services/pipeline.go b/services/pipeline.go
index 1ba8f2a75..d94daeb04 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -29,6 +29,8 @@ import (
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/utils"
        "github.com/google/uuid"
        v11 "go.temporal.io/api/enums/v1"
@@ -43,10 +45,9 @@ var globalPipelineLog = logger.Global.Nested("pipeline 
service")
 
 // PipelineQuery is a query for GetPipelines
 type PipelineQuery struct {
+       Pagination
        Status      string `form:"status"`
        Pending     int    `form:"pending"`
-       Page        int    `form:"page"`
-       PageSize    int    `form:"pageSize"`
        BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
        Label       string `form:"label"`
 }
@@ -73,8 +74,14 @@ func pipelineServiceInit() {
                watchTemporalPipelines()
        } else {
                // standalone mode: reset pipeline status
-               db.Model(&models.DbPipeline{}).Where("status <> ?", 
models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
-               db.Model(&models.Task{}).Where("status <> ?", 
models.TASK_COMPLETED).Update("status", models.TASK_FAILED)
+               err := db.UpdateColumn(
+                       &models.DbPipeline{},
+                       "status", models.TASK_FAILED,
+                       dal.Where("status = ?", models.TASK_RUNNING),
+               )
+               if err != nil {
+                       panic(err)
+               }
        }
 
        err := ReloadBlueprints(cronManager)
@@ -159,6 +166,7 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
        sema := semaphore.NewWeighted(pipelineMaxParallel)
        runningParallelLabels := []string{}
        var runningParallelLabelLock sync.Mutex
+       dbPipeline := &models.DbPipeline{}
        for {
                globalPipelineLog.Info("acquire lock")
                // start goroutine when sema lock ready and pipeline exist.
@@ -168,31 +176,45 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
                        panic(err)
                }
                globalPipelineLog.Info("get lock and wait next pipeline")
-               dbPipeline := &models.DbPipeline{}
                for {
                        cronLocker.Lock()
                        // prepare query to find an appropriate pipeline to 
execute
-                       db.Where("status IN ?", []string{models.TASK_CREATED, 
models.TASK_RERUN}).
-                               Joins(`left join _devlake_pipeline_labels ON
+                       err := db.First(dbPipeline,
+                               dal.Where("status IN ?", 
[]string{models.TASK_CREATED, models.TASK_RERUN}),
+                               dal.Join(
+                                       `left join _devlake_pipeline_labels ON
                                                
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
                                                _devlake_pipeline_labels.name 
LIKE 'parallel/%' AND
-                                               _devlake_pipeline_labels.name 
in ?`, runningParallelLabels).
-                               Group(`id`).
-                               
Having(`count(_devlake_pipeline_labels.name)=0`).
-                               Select("id").
-                               Order("id ASC").Limit(1).Find(dbPipeline)
+                                               _devlake_pipeline_labels.name 
in ?`,
+                                       runningParallelLabels,
+                               ),
+                               dal.Groupby("id"),
+                               
dal.Having("count(_devlake_pipeline_labels.name)=0"),
+                               dal.Select("id"),
+                               dal.Orderby("id ASC"),
+                               dal.Limit(1),
+                       )
+                       if err != nil && !db.IsErrorNotFound(err) {
+                               globalPipelineLog.Error(err, "dequeue failed")
+                       }
                        cronLocker.Unlock()
-                       if dbPipeline.ID != 0 {
+                       if !db.IsErrorNotFound(err) {
                                break
                        }
                        time.Sleep(time.Second)
                }
 
-               db.Model(&models.DbPipeline{}).Where("id = ?", 
dbPipeline.ID).Updates(map[string]interface{}{
-                       "status":   models.TASK_RUNNING,
-                       "message":  "",
-                       "began_at": time.Now(),
-               })
+               // mark the pipeline running
+               err = db.UpdateColumns(&models.DbPipeline{}, []dal.DalSet{
+                       {ColumnName: "status", Value: models.TASK_RUNNING},
+                       {ColumnName: "message", Value: ""},
+                       {ColumnName: "began_at", Value: time.Now()},
+               }, dal.Where("id = ?", dbPipeline.ID))
+               if err != nil {
+                       panic(err)
+               }
+
+               // load pipeline
                dbPipeline, err = GetDbPipeline(dbPipeline.ID)
                if err != nil {
                        panic(err)
@@ -234,7 +256,7 @@ func watchTemporalPipelines() {
                for range ticker.C {
                        // load all running pipeline from database
                        runningDbPipelines := make([]models.DbPipeline, 0)
-                       err := db.Find(&runningDbPipelines, "status = ?", 
models.TASK_RUNNING).Error
+                       err := db.All(&runningDbPipelines, dal.Where("status = 
?", models.TASK_RUNNING))
                        if err != nil {
                                panic(err)
                        }
@@ -277,11 +299,11 @@ func watchTemporalPipelines() {
                                                }
                                        }
                                        rp.FinishedAt = 
desc.WorkflowExecutionInfo.CloseTime
-                                       err = 
db.Model(rp).Updates(map[string]interface{}{
-                                               "status":      rp.Status,
-                                               "message":     rp.Message,
-                                               "finished_at": rp.FinishedAt,
-                                       }).Error
+                                       err = db.UpdateColumns(rp, []dal.DalSet{
+                                               {ColumnName: "status", Value: 
rp.Status},
+                                               {ColumnName: "message", Value: 
rp.Message},
+                                               {ColumnName: "finished_at", 
Value: rp.FinishedAt},
+                                       })
                                        if err != nil {
                                                globalPipelineLog.Error(err, 
"failed to update db: %v", err)
                                        }
@@ -352,23 +374,25 @@ func CancelPipeline(pipelineId uint64) errors.Error {
        cronLocker.Lock()
        defer cronLocker.Unlock()
        pipeline := &models.DbPipeline{}
-       err := db.First(pipeline, pipelineId).Error
+       err := db.First(pipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
                return errors.BadInput.New("pipeline not found")
        }
        if pipeline.Status == models.TASK_CREATED || pipeline.Status == 
models.TASK_RERUN {
                pipeline.Status = models.TASK_CANCELLED
-               result := db.Save(pipeline)
-               if result.Error != nil {
-                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline")
+               err = db.Update(pipeline)
+               if err != nil {
+                       return errors.Default.Wrap(err, "faile to update 
pipeline")
                }
                // now, with RunPipelineInQueue being block and target pipeline 
got updated
                // we should update the related tasks as well
-               result = db.Model(&models.Task{}).
-                       Where("pipeline_id = ?", pipelineId).
-                       Update("status", models.TASK_CANCELLED)
-               if result.Error != nil {
-                       return errors.Default.Wrap(result.Error, "faile to 
update pipeline tasks")
+               err = db.UpdateColumn(
+                       &models.Task{},
+                       "status", models.TASK_CANCELLED,
+                       dal.Where("pipeline_id = ?", pipelineId),
+               )
+               if err != nil {
+                       return errors.Default.Wrap(err, "faile to update 
pipeline tasks")
                }
                // the target pipeline is pending, no running, no need to 
perform the actual cancel operation
                return nil
@@ -376,7 +400,7 @@ func CancelPipeline(pipelineId uint64) errors.Error {
        if temporalClient != nil {
                return 
errors.Convert(temporalClient.CancelWorkflow(context.Background(), 
getTemporalWorkflowId(pipelineId), ""))
        }
-       pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, 
Pending: 1, PageSize: -1})
+       pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, 
Pending: 1, Pagination: Pagination{PageSize: -1}})
        if err != nil {
                return errors.Convert(err)
        }
@@ -402,3 +426,95 @@ func getPipelineLogsPath(pipeline *models.Pipeline) 
(string, errors.Error) {
        }
        return "", errors.Default.Wrap(err, fmt.Sprintf("error validating logs 
path for pipeline #%d", pipeline.ID))
 }
+
+// RerunPipeline would rerun all failed tasks or specified task
+func RerunPipeline(pipelineId uint64, task *models.Task) ([]*models.Task, 
errors.Error) {
+       // prevent pipeline executor from doing anything that might jeopardize 
the integrity
+       cronLocker.Lock()
+       defer cronLocker.Unlock()
+
+       // load the pipeline
+       pipeline, err := GetPipeline(pipelineId)
+       if err != nil {
+               return nil, err
+       }
+
+       // verify the status
+       if pipeline.Status == models.TASK_RUNNING {
+               return nil, errors.BadInput.New("pipeline is running")
+       }
+       if pipeline.Status == models.TASK_CREATED || pipeline.Status == 
models.TASK_RERUN {
+               return nil, errors.BadInput.New("pipeline is waiting to run")
+       }
+
+       // determine which tasks to rerun
+       var failedTasks []*models.Task
+       if task != nil {
+               if task.PipelineId != pipelineId {
+                       return nil, errors.BadInput.New("the task ID and 
pipeline ID doesn't match")
+               }
+               failedTasks = append(failedTasks, task)
+       } else {
+               tasks, err := GetTasksWithLastStatus(pipelineId)
+               if err != nil {
+                       return nil, errors.Default.Wrap(err, "error getting 
tasks")
+               }
+               for _, t := range tasks {
+                       if t.Status != models.TASK_COMPLETED {
+                               failedTasks = append(failedTasks, t)
+                       }
+               }
+       }
+
+       // no tasks to rerun
+       if len(failedTasks) == 0 {
+               return nil, errors.BadInput.New("no tasks to be re-ran")
+       }
+
+       // create new tasks
+       // TODO: this is better to be wrapped inside a transaction
+       rerunTasks := []*models.Task{}
+       for _, t := range failedTasks {
+               // mark previous task failed
+               t.Status = models.TASK_FAILED
+               err := db.UpdateColumn(t, "status", models.TASK_FAILED)
+               if err != nil {
+                       return nil, err
+               }
+               // create new task
+               subtasks, err := t.GetSubTasks()
+               if err != nil {
+                       return nil, err
+               }
+               options, err := t.GetOptions()
+               if err != nil {
+                       return nil, err
+               }
+               rerunTask, err := CreateTask(&models.NewTask{
+                       PipelineTask: &core.PipelineTask{
+                               Plugin:   t.Plugin,
+                               Subtasks: subtasks,
+                               Options:  options,
+                       },
+                       PipelineId:  t.PipelineId,
+                       PipelineRow: t.PipelineRow,
+                       PipelineCol: t.PipelineCol,
+                       IsRerun:     true,
+               })
+               if err != nil {
+                       return nil, err
+               }
+               // append to result
+               rerunTasks = append(rerunTasks, rerunTask)
+       }
+
+       // mark pipline rerun
+       err = db.UpdateColumn(&models.DbPipeline{},
+               "status", models.TASK_RERUN,
+               dal.Where("id = ?", pipelineId),
+       )
+       if err != nil {
+               return nil, err
+       }
+       return rerunTasks, nil
+}
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index d9300e76b..4aea9279e 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -19,14 +19,12 @@ package services
 
 import (
        "encoding/json"
-       goerror "errors"
        "fmt"
 
-       "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 // ErrBlueprintRunning indicates there is a running pipeline with the 
specified blueprint_id
@@ -37,8 +35,10 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
        cronLocker.Lock()
        defer cronLocker.Unlock()
        if newPipeline.BlueprintId > 0 {
-               var count int64
-               err := db.Model(&models.DbPipeline{}).Where("blueprint_id = ? 
AND status IN ?", newPipeline.BlueprintId, 
models.PendingTaskStatus).Count(&count).Error
+               count, err := db.Count(
+                       dal.From(&models.DbPipeline{}),
+                       dal.Where("blueprint_id = ? AND status IN ?", 
newPipeline.BlueprintId, models.PendingTaskStatus),
+               )
                if err != nil {
                        return nil, errors.Default.Wrap(err, "query pipelines 
error")
                }
@@ -69,7 +69,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
        }
 
        // save pipeline to database
-       if err := db.Create(&dbPipeline).Error; err != nil {
+       if err := db.Create(&dbPipeline); err != nil {
                globalPipelineLog.Error(err, "create pipeline failed: %v", err)
                return nil, errors.Internal.Wrap(err, "create pipeline failed")
        }
@@ -82,7 +82,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
                })
        }
        if len(dbPipeline.Labels) > 0 {
-               if err := db.Create(&dbPipeline.Labels).Error; err != nil {
+               if err := db.Create(&dbPipeline.Labels); err != nil {
                        globalPipelineLog.Error(err, "create pipeline's 
labelModels failed: %v", err)
                        return nil, errors.Internal.Wrap(err, "create 
pipeline's labelModels failed")
                }
@@ -117,9 +117,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
        }
 
        // update tasks state
-       if err := db.Model(dbPipeline).Updates(map[string]interface{}{
-               "total_tasks": dbPipeline.TotalTasks,
-       }).Error; err != nil {
+       if err := db.Update(dbPipeline); err != nil {
                globalPipelineLog.Error(err, "update pipline state failed: %v", 
err)
                return nil, errors.Internal.Wrap(err, "update pipline state 
failed")
        }
@@ -129,47 +127,48 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(*models.DbPipeline, erro
 
 // GetDbPipelines by query
 func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, 
errors.Error) {
-       dbPipelines := make([]*models.DbPipeline, 0)
-       dbQuery := db.Model(dbPipelines).Order("id DESC")
+       // process query parameters
+       clauses := []dal.Clause{dal.From(&models.DbPipeline{})}
        if query.BlueprintId != 0 {
-               dbQuery = dbQuery.Where("blueprint_id = ?", query.BlueprintId)
+               clauses = append(clauses, dal.Where("blueprint_id = ?", 
query.BlueprintId))
        }
        if query.Status != "" {
-               dbQuery = dbQuery.Where("status = ?", query.Status)
+               clauses = append(clauses, dal.Where("status = ?", query.Status))
        }
        if query.Pending > 0 {
-               dbQuery = dbQuery.Where("finished_at is null and status IN ?", 
models.PendingTaskStatus)
+               clauses = append(clauses, dal.Where("finished_at is null and 
status IN ?", models.PendingTaskStatus))
        }
        if query.Label != "" {
-               dbQuery = dbQuery.
-                       Joins(`left join _devlake_pipeline_labels ON 
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id`).
-                       Where(`_devlake_pipeline_labels.name = ?`, query.Label)
+               clauses = append(clauses,
+                       dal.Join("LEFT JOIN _devlake_pipeline_labels bl ON 
bl.pipeline_id = _devlake_pipelines.id"),
+                       dal.Where("bl.name = ?", query.Label),
+               )
        }
-       var count int64
-       err := dbQuery.Count(&count).Error
+
+       // count total records
+       count, err := db.Count(clauses...)
        if err != nil {
-               return nil, 0, errors.Default.Wrap(err, "error getting DB 
pipelines count")
+               return nil, 0, err
        }
 
-       dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
-
-       err = dbQuery.Find(&dbPipelines).Error
+       // load paginated blueprints from database
+       clauses = append(clauses,
+               dal.Orderby("id DESC"),
+               dal.Offset(query.GetSkip()),
+               dal.Limit(query.GetPageSize()),
+       )
+       dbPipelines := make([]*models.DbPipeline, 0)
+       err = db.All(&dbPipelines, clauses...)
        if err != nil {
-               return nil, count, errors.Default.Wrap(err, "error finding DB 
pipelines")
+               return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of pipelines")
        }
 
-       var pipelineIds []uint64
+       // load labels for blueprints
        for _, dbPipeline := range dbPipelines {
-               pipelineIds = append(pipelineIds, dbPipeline.ID)
-       }
-       dbLabels := []models.DbPipelineLabel{}
-       db.Where(`pipeline_id in ?`, pipelineIds).Find(&dbLabels)
-       dbLabelsMap := map[uint64][]models.DbPipelineLabel{}
-       for _, dbLabel := range dbLabels {
-               dbLabelsMap[dbLabel.PipelineId] = 
append(dbLabelsMap[dbLabel.PipelineId], dbLabel)
-       }
-       for _, dbPipeline := range dbPipelines {
-               dbPipeline.Labels = dbLabelsMap[dbPipeline.ID]
+               err = fillPipelineDetail(dbPipeline)
+               if err != nil {
+                       return nil, 0, err
+               }
        }
 
        return dbPipelines, count, nil
@@ -178,16 +177,16 @@ func GetDbPipelines(query *PipelineQuery) 
([]*models.DbPipeline, int64, errors.E
 // GetDbPipeline by id
 func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, errors.Error) {
        dbPipeline := &models.DbPipeline{}
-       err := db.First(dbPipeline, pipelineId).Error
+       err := db.First(dbPipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, errors.NotFound.New("pipeline not found")
                }
                return nil, errors.Internal.Wrap(err, "error getting the 
pipeline from database")
        }
-       err = db.Find(&dbPipeline.Labels, "pipeline_id = ?", pipelineId).Error
+       err = fillPipelineDetail(dbPipeline)
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error getting the 
pipeline from database")
+               return nil, err
        }
        return dbPipeline, nil
 }
@@ -248,7 +247,7 @@ func parseDbPipeline(pipeline *models.Pipeline) 
*models.DbPipeline {
 
 // encryptDbPipeline encrypts dbPipeline.Plan
 func encryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline, 
errors.Error) {
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
+       encKey := cfg.GetString(core.EncodeKeyEnvStr)
        planEncrypt, err := core.Encrypt(encKey, dbPipeline.Plan)
        if err != nil {
                return nil, err
@@ -259,7 +258,7 @@ func encryptDbPipeline(dbPipeline *models.DbPipeline) 
(*models.DbPipeline, error
 
 // encryptDbPipeline decrypts dbPipeline.Plan
 func decryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline, 
errors.Error) {
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
+       encKey := cfg.GetString(core.EncodeKeyEnvStr)
        plan, err := core.Decrypt(encKey, dbPipeline.Plan)
        if err != nil {
                return nil, err
@@ -268,11 +267,10 @@ func decryptDbPipeline(dbPipeline *models.DbPipeline) 
(*models.DbPipeline, error
        return dbPipeline, nil
 }
 
-// UpdateDbPipelineStatus update the status of pipeline
-func UpdateDbPipelineStatus(pipelineId uint64, status string) errors.Error {
-       err := db.Model(&models.DbPipeline{}).Where("id = ?", 
pipelineId).Update("status", status).Error
+func fillPipelineDetail(pipeline *models.DbPipeline) errors.Error {
+       err := db.All(&pipeline.Labels, dal.Where("pipeline_id = ?", 
pipeline.ID))
        if err != nil {
-               return errors.Convert(err)
+               return errors.Internal.Wrap(err, "error getting the pipeline 
labels from database")
        }
        return nil
 }
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 812b1123a..4347bdcb1 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -39,7 +39,7 @@ type pipelineRunner struct {
 
 func (p *pipelineRunner) runPipelineStandalone() errors.Error {
        return runner.RunPipeline(
-               runner.CreateBasicRes(cfg, p.logger, db),
+               basicRes.ReplaceLogger(p.logger),
                p.pipeline.ID,
                func(taskIds []uint64) errors.Error {
                        return RunTasksStandalone(p.logger, taskIds)
@@ -132,10 +132,10 @@ func runPipeline(pipelineId uint64) errors.Error {
                dbPipeline.Status = models.TASK_COMPLETED
                dbPipeline.Message = ""
        }
-       dbe := db.Model(dbPipeline).Updates(dbPipeline).Error
-       if dbe != nil {
-               globalPipelineLog.Error(dbe, "update pipeline state failed")
-               return errors.Convert(dbe)
+       err = db.Update(dbPipeline)
+       if err != nil {
+               globalPipelineLog.Error(err, "update pipeline state failed")
+               return errors.Convert(err)
        }
        // notify external webhook
        return NotifyExternal(pipelineId)
diff --git a/services/project.go b/services/project.go
index 5d705ced5..0063ec355 100644
--- a/services/project.go
+++ b/services/project.go
@@ -22,286 +22,275 @@ import (
 
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
-       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
 )
 
 // ProjectQuery used to query projects as the api project input
 type ProjectQuery struct {
-       Page     int `form:"page"`
-       PageSize int `form:"pageSize"`
+       Pagination
 }
 
-// CreateProject accepts a project instance and insert it to database
-func CreateProject(project *models.Project) errors.Error {
-       if project.Name == "" {
-               return errors.Default.New("can not use empty name for project")
+// GetProjects returns a paginated list of Projects based on `query`
+func GetProjects(query *ProjectQuery) ([]*models.Project, int64, errors.Error) 
{
+       // verify input
+       if err := VerifyStruct(query); err != nil {
+               return nil, 0, err
        }
-
-       /*project, err := encryptProject(project)
-       if err != nil {
-               return err
-       }*/
-       err := CreateDbProject(project)
-       if err != nil {
-               return err
+       clauses := []dal.Clause{
+               dal.From(&models.Project{}),
        }
-       return nil
-}
 
-// CreateProjectMetric accepts a ProjectMetric instance and insert it to 
database
-func CreateProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
-       /*enProjectMetric, err := encryptProjectMetric(projectMetric)
+       count, err := db.Count(clauses...)
        if err != nil {
-               return err
-       }*/
-       err := CreateDbProjectMetric(projectMetric)
-       if err != nil {
-               return err
+               return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of project")
        }
-       return nil
-}
 
-// GetProject returns a Project
-func GetProject(name string) (*models.Project, errors.Error) {
-       project, err := GetDbProject(name)
+       clauses = append(clauses,
+               dal.Orderby("created_at DESC"),
+               dal.Offset(query.GetSkip()),
+               dal.Limit(query.GetPageSize()),
+       )
+       projects := make([]*models.Project, 0)
+       err = db.All(&projects, clauses...)
        if err != nil {
-               return nil, errors.Convert(err)
+               return nil, 0, errors.Default.Wrap(err, "error finding DB 
project")
        }
 
-       /*project, err = decryptProject(project)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }*/
-
-       return project, nil
+       return projects, count, nil
 }
 
-// GetProjectMetric returns a ProjectMetric
-func GetProjectMetric(projectName string, pluginName string) 
(*models.ProjectMetric, errors.Error) {
-       projectMetric, err := GetDbProjectMetric(projectName, pluginName)
-       if err != nil {
-               return nil, errors.Convert(err)
+// CreateProject accepts a project instance and insert it to database
+func CreateProject(projectInput *models.ApiInputProject) 
(*models.ApiOutputProject, errors.Error) {
+       // verify input
+       if err := VerifyStruct(projectInput); err != nil {
+               return nil, err
        }
 
-       /*projectMetric, err = decryptProjectMetric(projectMetric)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }*/
-
-       return projectMetric, nil
-}
+       // create transaction to updte multiple tables
+       var err errors.Error
+       tx := db.Begin()
+       defer func() {
+               if r := recover(); r != nil || err != nil {
+                       err = tx.Rollback()
+                       if err != nil {
+                               log.Error(err, "PatchProject: failed to 
rollback")
+                       }
+               }
+       }()
 
-// FlushProjectMetrics remove all Project metrics by project name and create 
new metrics by baseMetrics
-func FlushProjectMetrics(projectName string, baseMetrics *[]models.BaseMetric) 
errors.Error {
-       err := removeAllDbProjectMetricsByProjectName(projectName)
+       // create project first
+       project := &models.Project{}
+       project.BaseProject = projectInput.BaseProject
+       err = db.Create(project)
        if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error to 
removeAllDbProjectMetricsByProjectName for %s", projectName))
-       }
-
-       for _, baseMetric := range *baseMetrics {
-               err = CreateProjectMetric(&models.ProjectMetric{
-                       BaseProjectMetric: models.BaseProjectMetric{
-                               ProjectName: projectName,
-                               BaseMetric:  baseMetric,
-                       },
-               })
-               if err != nil {
-                       return errors.Default.Wrap(err, fmt.Sprintf("failed to  
CreateProjectMetric for [%s][%s]", projectName, baseMetric.PluginName))
+               if db.IsDuplicationError(err) {
+                       return nil, errors.BadInput.New(fmt.Sprintf("A project 
with name [%s] already exists", project.Name))
                }
+               return nil, errors.Default.Wrap(err, "error creating DB 
project")
        }
 
-       return nil
-}
-
-// LoadBluePrintAndMetrics load the blueprint and ProjectMetrics for 
projectOutputv
-func LoadBluePrintAndMetrics(projectOutput *models.ApiOutputProject) 
errors.Error {
-       var err errors.Error
-
-       // load Metrics
-       projectMetrics, count, err := GetProjectMetrics(projectOutput.Name)
-       if err != nil {
-               return errors.Default.Wrap(err, "Failed to get project metrics 
by project")
-       }
-       if count == 0 {
-               projectOutput.Metrics = nil
-       } else {
-               baseMetric := make([]models.BaseMetric, len(*projectMetrics))
-               for i, projectMetric := range *projectMetrics {
-                       baseMetric[i] = projectMetric.BaseMetric
+       // check if need flush the Metrics
+       if projectInput.Metrics != nil {
+               err = refreshProjectMetrics(tx, projectInput)
+               if err != nil {
+                       return nil, err
                }
-               projectOutput.Metrics = &baseMetric
        }
 
-       // load blueprint
-       projectOutput.Blueprint, err = 
GetBlueprintByProjectName(projectOutput.Name)
+       // all good, commit transaction
+       err = tx.Commit()
        if err != nil {
-               return errors.Default.Wrap(err, "Error to get blueprint by 
project")
+               return nil, err
        }
 
-       return nil
+       return makeProjectOutput(&projectInput.BaseProject)
 }
 
-// GetProjectMetrics returns all ProjectMetric of the project
-func GetProjectMetrics(projectName string) (*[]models.ProjectMetric, int64, 
errors.Error) {
-       projectMetrics, count, err := GetDbProjectMetrics(projectName)
-       if err != nil {
-               return nil, 0, errors.Convert(err)
+// GetProject returns a Project
+func GetProject(name string) (*models.ApiOutputProject, errors.Error) {
+       // verify input
+       if name == "" {
+               return nil, errors.BadInput.New("project name is missing")
        }
 
-       /*for i, projectMetric := range projectMetrics {
-               projectMetrics[i], err = decryptProjectMetric(projectMetric)
-               if err != nil {
-                       return nil, 0, err
-               }
-       }*/
-
-       return projectMetrics, count, nil
-}
-
-// GetProjects returns a paginated list of Projects based on `query`
-func GetProjects(query *ProjectQuery) ([]*models.Project, int64, errors.Error) 
{
-       projects, count, err := GetDbProjects(query)
+       // load project
+       project := &models.Project{}
+       err := db.First(project, dal.Where("name = ?", name))
        if err != nil {
-               return nil, 0, errors.Convert(err)
-       }
-
-       /*for i, project := range projects {
-               projects[i], err = decryptProject(project)
-               if err != nil {
-                       return nil, 0, err
+               if db.IsErrorNotFound(err) {
+                       return nil, errors.NotFound.Wrap(err, 
fmt.Sprintf("could not find project [%s] in DB", name))
                }
-       }*/
+               return nil, errors.Default.Wrap(err, "error getting project 
from DB")
+       }
 
-       return projects, count, nil
+       // convert to api output
+       return makeProjectOutput(&project.BaseProject)
 }
 
 // PatchProject FIXME ...
 func PatchProject(name string, body map[string]interface{}) 
(*models.ApiOutputProject, errors.Error) {
        projectInput := &models.ApiInputProject{}
-       projectOutput := &models.ApiOutputProject{}
 
-       // load record from db
-       project, err := GetProject(name)
+       // load input
+       err := helper.DecodeMapStruct(body, projectInput)
        if err != nil {
                return nil, err
        }
 
-       err = helper.DecodeMapStruct(body, projectInput)
+       // wrap all operation inside a transaction
+       tx := db.Begin()
+       defer func() {
+               if r := recover(); r != nil || err != nil {
+                       err = tx.Rollback()
+                       if err != nil {
+                               log.Error(err, "PatchProject: failed to 
rollback")
+                       }
+               }
+       }()
+
+       project := &models.Project{}
+       err = tx.First(project, dal.Where("name = ?", name), dal.Lock(true, 
false))
        if err != nil {
                return nil, err
        }
+
        // allowed to changed the name
        if projectInput.Name == "" {
                projectInput.Name = name
        }
        project.BaseProject = projectInput.BaseProject
 
-       /*enProject, err := encryptProject(project)
-       if err != nil {
-               return nil, err
-       }*/
-
-       // check if the name has changed, with the project name changing, we 
have to change the other table at the same time as follows
+       // name changed, updates the related entities as well
        if name != project.Name {
-               //project name
-               err = RenameProjectName(name, project.Name)
+               // ProjectMetric
+               err = tx.UpdateColumn(
+                       &models.ProjectMetric{},
+                       "project_name", project.Name,
+                       dal.Where("project_name = ?", name),
+               )
                if err != nil {
                        return nil, err
                }
 
-               //ProjectMetric
-               err = RenameProjectNameForProjectMetric(name, project.Name)
+               // ProjectPrMetric
+               err = tx.UpdateColumn(
+                       &crossdomain.ProjectPrMetric{},
+                       "project_name", project.Name,
+                       dal.Where("project_name = ?", name),
+               )
                if err != nil {
                        return nil, err
                }
 
-               //ProjectPrMetric
-               err = RenameProjectNameForProjectPrMetric(name, project.Name)
+               // ProjectIssueMetric
+               err = tx.UpdateColumn(
+                       &crossdomain.ProjectIssueMetric{},
+                       "project_name", project.Name,
+                       dal.Where("project_name = ?", name),
+               )
                if err != nil {
                        return nil, err
                }
 
-               //ProjectMapping
-               err = RenameProjectNameForProjectIssueMetric(name, project.Name)
+               // ProjectMapping
+               err = tx.UpdateColumn(
+                       &crossdomain.ProjectMapping{},
+                       "project_name", project.Name,
+                       dal.Where("project_name = ?", name),
+               )
                if err != nil {
                        return nil, err
                }
 
-               //Blueprint
-               err = RenameProjectNameForBlueprint(name, project.Name)
+               // Blueprint
+               err = tx.UpdateColumn(
+                       &models.DbBlueprint{},
+                       "project_name", project.Name,
+                       dal.Where("project_name = ?", name),
+               )
                if err != nil {
                        return nil, err
                }
-
-               // rename the project for each plugin
-               err = core.TraversalPlugin(func(name string, plugin 
core.PluginMeta) errors.Error {
-                       if handle, ok := plugin.(core.PluginHandle); ok {
-                               return handle.RenameProjectName(name, 
project.Name)
-                       }
-                       return nil
-               })
-               if err != nil {
-                       return nil, errors.Internal.Wrap(err, "error to rename 
project name for plugins")
-               }
        }
 
-       // save
-       err = SaveDbProject(project)
+       // Blueprint
+       err = tx.UpdateColumn(
+               &models.DbBlueprint{},
+               "enable", projectInput.Enable,
+               dal.Where("project_name = ?", name),
+       )
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error saving project")
+               return nil, err
        }
 
-       // check if need to changed the blueprint setting
-       if projectInput.Enable != nil {
-               _, err = PatchBlueprintEnableByProjectName(projectInput.Name, 
*projectInput.Enable)
+       // refresh project metrics if needed
+       if projectInput.Metrics != nil {
+               err = refreshProjectMetrics(tx, projectInput)
                if err != nil {
-                       return nil, errors.Default.Wrap(err, "Failed to set if 
project enable")
+                       return nil, err
                }
        }
 
-       // check if need flush the Metrics
-       if projectInput.Metrics != nil {
-               err = FlushProjectMetrics(projectInput.Name, 
projectInput.Metrics)
-               if err != nil {
-                       return nil, errors.Default.Wrap(err, "Failed to flush 
project metrics")
-               }
+       // update project itself
+       err = tx.Update(project)
+       if err != nil {
+               return nil, err
        }
 
-       projectOutput.BaseProject = projectInput.BaseProject
-       err = LoadBluePrintAndMetrics(projectOutput)
+       // commit the transaction
+       err = tx.Commit()
        if err != nil {
-               return nil, errors.Default.Wrap(err, fmt.Sprintf("Failed to 
LoadBluePrintAndMetrics on PatchProject for %s", projectOutput.Name))
+               return nil, err
        }
 
-       // done
-       return projectOutput, nil
+       // all good, render output
+       return makeProjectOutput(&projectInput.BaseProject)
 }
 
-// PatchProjectMetric FIXME ...
-func PatchProjectMetric(projectName string, pluginName string, body 
map[string]interface{}) (*models.ProjectMetric, errors.Error) {
-       // load record from db
-       projectMetric, err := GetDbProjectMetric(projectName, pluginName)
+func refreshProjectMetrics(tx dal.Transaction, projectInput 
*models.ApiInputProject) errors.Error {
+       err := tx.Delete(&models.ProjectMetric{}, dal.Where("project_name = ?", 
projectInput.Name))
        if err != nil {
-               return nil, err
+               return err
        }
 
-       err = helper.DecodeMapStruct(body, projectMetric)
-       if err != nil {
-               return nil, err
+       for _, baseMetric := range *projectInput.Metrics {
+               err = tx.Create(&models.ProjectMetric{
+                       BaseProjectMetric: models.BaseProjectMetric{
+                               ProjectName: projectInput.Name,
+                               BaseMetric:  baseMetric,
+                       },
+               })
+               if err != nil {
+                       return err
+               }
        }
+       return nil
+}
 
-       /*enProjectMetric, err := encryptProjectMetric(projectMetric)
+func makeProjectOutput(baseProject *models.BaseProject) 
(*models.ApiOutputProject, errors.Error) {
+       projectOutput := &models.ApiOutputProject{}
+       projectOutput.BaseProject = *baseProject
+       // load project metrics
+       projectMetrics := make([]models.ProjectMetric, 0)
+       err := db.All(&projectMetrics, dal.Where("project_name = ?", 
projectOutput.Name))
        if err != nil {
-               return nil, err
-       }*/
+               return nil, errors.Default.Wrap(err, "failed to load project 
metrics")
+       }
+       // convert metric to api output
+       if len(projectMetrics) > 0 {
+               baseMetric := make([]models.BaseMetric, len(projectMetrics))
+               for i, projectMetric := range projectMetrics {
+                       baseMetric[i] = projectMetric.BaseMetric
+               }
+               projectOutput.Metrics = &baseMetric
+       }
 
-       // save
-       err = SaveDbProjectMetric(projectMetric)
+       // load blueprint
+       projectOutput.Blueprint, err = 
GetBlueprintByProjectName(projectOutput.Name)
        if err != nil {
-               return nil, errors.Internal.Wrap(err, "error saving project")
+               return nil, errors.Default.Wrap(err, "Error to get blueprint by 
project")
        }
-
-       // done
-       return projectMetric, nil
+       return projectOutput, err
 }
diff --git a/services/project_helper.go b/services/project_helper.go
deleted file mode 100644
index 09e2e8d3f..000000000
--- a/services/project_helper.go
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package services
-
-import (
-       goerror "errors"
-       "fmt"
-       "strings"
-
-       "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/models"
-       "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
-       "gorm.io/gorm"
-       "gorm.io/gorm/clause"
-)
-
-// CreateDbProject accepts a project instance and insert it to database
-func CreateDbProject(project *models.Project) errors.Error {
-       err := db.Create(project).Error
-       if err != nil {
-               if strings.Contains(strings.ToLower(err.Error()), "duplicate") {
-                       return errors.BadInput.New(fmt.Sprintf("The project 
[%s] already exists,cannot be created again", project.Name))
-               }
-               return errors.Default.Wrap(err, "error creating DB project")
-       }
-       return nil
-}
-
-// CreateDbProjectMetric accepts a project metric instance and insert it to 
database
-func CreateDbProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
-       err := db.Create(projectMetric).Error
-       if err != nil {
-               return errors.Default.Wrap(err, "error creating DB project 
metric")
-       }
-       return nil
-}
-
-// SaveDbProject save a project instance and update it to database
-func SaveDbProject(project *models.Project) errors.Error {
-       err := db.Save(project).Error
-       if err != nil {
-               return errors.Default.Wrap(err, "error saving DB project")
-       }
-       return nil
-}
-
-// SaveDbProjectMetric save a project instance and update it to database
-func SaveDbProjectMetric(projectMetric *models.ProjectMetric) errors.Error {
-       err := db.Save(projectMetric).Error
-       if err != nil {
-               return errors.Default.Wrap(err, "error saving DB project 
metric")
-       }
-       return nil
-}
-
-// GetDbProjects returns a paginated list of Project based on `query`
-func GetDbProjects(query *ProjectQuery) ([]*models.Project, int64, 
errors.Error) {
-       projects := make([]*models.Project, 0)
-       db := db.Model(projects).Order("created_at desc")
-
-       var count int64
-       err := db.Count(&count).Error
-       if err != nil {
-               return nil, 0, errors.Default.Wrap(err, "error getting DB count 
of project")
-       }
-       db = processDbClausesWithPager(db, query.PageSize, query.Page)
-
-       err = db.Find(&projects).Error
-       if err != nil {
-               return nil, 0, errors.Default.Wrap(err, "error finding DB 
project")
-       }
-
-       return projects, count, nil
-}
-
-// GetDbProject returns the detail of a given project name
-func GetDbProject(name string) (*models.Project, errors.Error) {
-       project := &models.Project{}
-       project.Name = name
-
-       err := db.First(project).Error
-       if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
-                       return nil, errors.NotFound.Wrap(err, 
fmt.Sprintf("could not find project [%s] in DB", name))
-               }
-               return nil, errors.Default.Wrap(err, "error getting project 
from DB")
-       }
-
-       return project, nil
-}
-
-// GetDbProjectMetric returns the detail of a given project name
-func GetDbProjectMetric(projectName string, pluginName string) 
(*models.ProjectMetric, errors.Error) {
-       projectMetric := &models.ProjectMetric{}
-       projectMetric.ProjectName = projectName
-       projectMetric.PluginName = pluginName
-
-       err := db.First(projectMetric).Error
-       if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
-                       return nil, errors.NotFound.Wrap(err, 
fmt.Sprintf("could not find project metric [%s][%s] in DB", projectName, 
pluginName))
-               }
-               return nil, errors.Default.Wrap(err, "error getting project 
metric from DB")
-       }
-
-       return projectMetric, nil
-}
-
-// GetDbProjectMetrics returns all of Metrics of a given project name
-func GetDbProjectMetrics(projectName string) (*[]models.ProjectMetric, int64, 
errors.Error) {
-       projectMetrics := make([]models.ProjectMetric, 0)
-       db := db.Table(models.ProjectMetric{}.TableName()).Where("project_name 
= ?", projectName)
-
-       var count int64
-       err := db.Count(&count).Error
-       if err != nil {
-               return nil, 0, errors.Default.Wrap(err, fmt.Sprintf("could not 
get project metric count for projectName [%s] in DB", projectName))
-       }
-
-       if count == 0 {
-               return nil, 0, nil
-       }
-
-       err = db.Find(&projectMetrics).Error
-       if err != nil {
-               return nil, 0, errors.Default.Wrap(err, fmt.Sprintf("could not 
find project metric for projectName [%s] in DB", projectName))
-       }
-
-       return &projectMetrics, count, nil
-}
-
-// RenameProjectName FIXME ...
-func RenameProjectName(oldProjectName string, newProjectName string) 
errors.Error {
-       err := db.Exec(
-               "UPDATE ? SET name = ? WHERE name = ?",
-               clause.Table{Name: models.Project{}.TableName()},
-               newProjectName,
-               oldProjectName,
-       ).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForProjectMetric from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-// RenameProjectNameForProjectMetric FIXME ...
-func RenameProjectNameForProjectMetric(oldProjectName string, newProjectName 
string) errors.Error {
-       err := db.Model(&models.ProjectMetric{}).
-               Where("project_name = ?", oldProjectName).
-               Updates(map[string]interface{}{
-                       "project_name": newProjectName,
-               }).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForProjectMetric from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-// RenameProjectNameForProjectPrMetric FIXME ...
-func RenameProjectNameForProjectPrMetric(oldProjectName string, newProjectName 
string) errors.Error {
-       err := db.Model(&crossdomain.ProjectPrMetric{}).
-               Where("project_name = ?", oldProjectName).
-               Updates(map[string]interface{}{
-                       "project_name": newProjectName,
-               }).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForProjectPrMetric from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-// RenameProjectNameForProjectIssueMetric FIXME ...
-func RenameProjectNameForProjectIssueMetric(oldProjectName string, 
newProjectName string) errors.Error {
-       err := db.Model(&crossdomain.ProjectIssueMetric{}).
-               Where("project_name = ?", oldProjectName).
-               Updates(map[string]interface{}{
-                       "project_name": newProjectName,
-               }).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForProjectIssueMetric from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-// RenameProjectNameForProjectMapping FIXME ...
-func RenameProjectNameForProjectMapping(oldProjectName string, newProjectName 
string) errors.Error {
-       err := db.Model(&crossdomain.ProjectMapping{}).
-               Where("project_name = ?", oldProjectName).
-               Updates(map[string]interface{}{
-                       "project_name": newProjectName,
-               }).Error
-       if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("Failed to 
RenameProjectNameForProjectMapping from [%s] to [%s]", oldProjectName, 
newProjectName))
-       }
-
-       return nil
-}
-
-func removeAllDbProjectMetricsByProjectName(projectName string) errors.Error {
-       err := db.Delete(&models.ProjectMetric{}, "project_name = ?", 
projectName).Error
-       if err != nil {
-               return errors.Default.Wrap(err, "error deleting ProjectMetrics 
from DB")
-       }
-       return nil
-}
-
-// encryptProject
-/*func encryptProject(project *models.Project) (*models.Project, errors.Error) 
{
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
-       describeEncrypt, err := core.Encrypt(encKey, project.Description)
-       if err != nil {
-               return nil, err
-       }
-       project.Description = describeEncrypt
-
-       return project, nil
-}
-
-// encryptProjectMetric
-func encryptProjectMetric(projectMetric *models.ProjectMetric) 
(*models.ProjectMetric, errors.Error) {
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
-       pluginOption, err := core.Encrypt(encKey, projectMetric.PluginOption)
-       if err != nil {
-               return nil, err
-       }
-       projectMetric.PluginOption = pluginOption
-
-       return projectMetric, nil
-}*/
-
-// decryptProject
-/*func decryptProject(project *models.Project) (*models.Project, errors.Error) 
{
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
-       describe, err := core.Decrypt(encKey, project.Description)
-       if err != nil {
-               return nil, err
-       }
-       project.Description = describe
-
-       return project, nil
-}
-
-// decryptProjectMetric
-func decryptProjectMetric(projectMetric *models.ProjectMetric) 
(*models.ProjectMetric, errors.Error) {
-       encKey := config.GetConfig().GetString(core.EncodeKeyEnvStr)
-
-       pluginOption, err := core.Decrypt(encKey, projectMetric.PluginOption)
-       if err != nil {
-               return nil, err
-       }
-       projectMetric.PluginOption = pluginOption
-
-       return projectMetric, nil
-}*/
diff --git a/services/pushapi.go b/services/pushapi.go
index 85e850190..1269bc46a 100644
--- a/services/pushapi.go
+++ b/services/pushapi.go
@@ -17,13 +17,16 @@ limitations under the License.
 
 package services
 
-import "github.com/apache/incubator-devlake/errors"
+import (
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+)
 
 // InsertRow FIXME ...
 func InsertRow(table string, rows []map[string]interface{}) (int64, 
errors.Error) {
-       tx := db.Table(table).Create(rows)
-       if tx.Error != nil {
-               return 0, errors.Convert(tx.Error)
+       err := db.Create(rows, dal.From(table))
+       if err != nil {
+               return 0, err
        }
-       return tx.RowsAffected, nil
+       return 1, nil
 }
diff --git a/services/task.go b/services/task.go
index 5484d48af..0630e0ddc 100644
--- a/services/task.go
+++ b/services/task.go
@@ -18,123 +18,31 @@ limitations under the License.
 package services
 
 import (
-       "context"
        "encoding/json"
-       goerror "errors"
        "fmt"
        "regexp"
-       "strconv"
        "strings"
-       "sync"
 
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
-       "github.com/apache/incubator-devlake/models/common"
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/runner"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 var taskLog = logger.Global.Nested("task service")
 var activityPattern = regexp.MustCompile(`task #(\d+)`)
 
-// RunningTaskData FIXME ...
-type RunningTaskData struct {
-       Cancel         context.CancelFunc
-       ProgressDetail *models.TaskProgressDetail
-}
-
-// RunningTask FIXME ...
-type RunningTask struct {
-       mu    sync.Mutex
-       tasks map[uint64]*RunningTaskData
-}
-
-// Add FIXME ...
-func (rt *RunningTask) Add(taskId uint64, cancel context.CancelFunc) 
errors.Error {
-       rt.mu.Lock()
-       defer rt.mu.Unlock()
-       if _, ok := rt.tasks[taskId]; ok {
-               return errors.Default.New(fmt.Sprintf("task with id %d already 
running", taskId))
-       }
-       rt.tasks[taskId] = &RunningTaskData{
-               Cancel:         cancel,
-               ProgressDetail: &models.TaskProgressDetail{},
-       }
-       return nil
-}
-
-func (rt *RunningTask) setAll(progressDetails 
map[uint64]*models.TaskProgressDetail) {
-       rt.mu.Lock()
-       defer rt.mu.Unlock()
-       // delete finished tasks
-       for taskId := range rt.tasks {
-               if _, ok := progressDetails[taskId]; !ok {
-                       delete(rt.tasks, taskId)
-               }
-       }
-       for taskId, progressDetail := range progressDetails {
-               if _, ok := rt.tasks[taskId]; !ok {
-                       rt.tasks[taskId] = &RunningTaskData{}
-               }
-               rt.tasks[taskId].ProgressDetail = progressDetail
-       }
-}
-
-// FillProgressDetailToTasks lock less times than GetProgressDetail
-func (rt *RunningTask) FillProgressDetailToTasks(tasks []models.Task) {
-       rt.mu.Lock()
-       defer rt.mu.Unlock()
-
-       for index, task := range tasks {
-               taskId := task.ID
-               if task, ok := rt.tasks[taskId]; ok {
-                       tasks[index].ProgressDetail = task.ProgressDetail
-               }
-       }
-}
-
-// GetProgressDetail FIXME ...
-func (rt *RunningTask) GetProgressDetail(taskId uint64) 
*models.TaskProgressDetail {
-       rt.mu.Lock()
-       defer rt.mu.Unlock()
-
-       if task, ok := rt.tasks[taskId]; ok {
-               return task.ProgressDetail
-       }
-       return nil
-}
-
-// Remove FIXME ...
-func (rt *RunningTask) Remove(taskId uint64) (context.CancelFunc, 
errors.Error) {
-       rt.mu.Lock()
-       defer rt.mu.Unlock()
-       if d, ok := rt.tasks[taskId]; ok {
-               delete(rt.tasks, taskId)
-               return d.Cancel, nil
-       }
-       return nil, errors.NotFound.New(fmt.Sprintf("task with id %d not 
found", taskId))
-}
-
-var runningTasks RunningTask
-
-// TaskQuery FIXME ...
+// TaskQuery FIXME .
 type TaskQuery struct {
+       Pagination
        Status     string `form:"status"`
-       Page       int    `form:"page"`
-       PageSize   int    `form:"page_size"`
        Plugin     string `form:"plugin"`
        PipelineId uint64 `form:"pipelineId" uri:"pipelineId"`
        Pending    int    `form:"pending"`
 }
 
-func init() {
-       // set all previous unfinished tasks to status failed
-       runningTasks.tasks = make(map[uint64]*RunningTaskData)
-}
-
-// CreateTask FIXME ...
+// CreateTask creates a new task
 func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
        b, err := json.Marshal(newTask.Options)
        if err != nil {
@@ -145,7 +53,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, 
errors.Error) {
                return nil, errors.Convert(err)
        }
 
-       task := models.Task{
+       task := &models.Task{
                Plugin:      newTask.Plugin,
                Subtasks:    s,
                Options:     b,
@@ -155,56 +63,72 @@ func CreateTask(newTask *models.NewTask) (*models.Task, 
errors.Error) {
                PipelineRow: newTask.PipelineRow,
                PipelineCol: newTask.PipelineCol,
        }
-       err = db.Save(&task).Error
+       if newTask.IsRerun {
+               task.Status = models.TASK_RERUN
+       }
+       err = db.Create(task)
        if err != nil {
                taskLog.Error(err, "save task failed")
                return nil, errors.Internal.Wrap(err, "save task failed")
        }
-       return &task, nil
+       return task, nil
 }
 
-// GetTasks FIXME ...
-func GetTasks(query *TaskQuery) ([]models.Task, int64, errors.Error) {
-       db := db.Model(&models.Task{}).Order("id DESC")
+// GetTasks returns paginated tasks that match the given query
+func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
+       // verify query
+       if err := VerifyStruct(query); err != nil {
+               return nil, 0, err
+       }
+
+       // construct common query clauses
+       clauses := []dal.Clause{dal.From(&models.Task{})}
        if query.Status != "" {
-               db = db.Where("status = ?", query.Status)
+               clauses = append(clauses, dal.Where("status = ?", query.Status))
        }
        if query.Plugin != "" {
-               db = db.Where("plugin = ?", query.Plugin)
+               clauses = append(clauses, dal.Where("plugin = ?", query.Plugin))
        }
        if query.PipelineId > 0 {
-               db = db.Where("pipeline_id = ?", query.PipelineId)
+               clauses = append(clauses, dal.Where("pipeline_id = ?", 
query.PipelineId))
        }
        if query.Pending > 0 {
-               db = db.Where("finished_at is null")
+               clauses = append(clauses, dal.Where("finished_at is null"))
        }
-       var count int64
-       err := db.Count(&count).Error
+
+       // count total records
+       count, err := db.Count(clauses...)
        if err != nil {
-               return nil, 0, errors.Convert(err)
+               return nil, 0, err
        }
 
-       tasks := make([]models.Task, 0)
-       err = db.Find(&tasks).Error
+       // load paginated records from db
+       clauses = append(clauses,
+               dal.Orderby("id DESC"),
+               dal.Offset(query.GetSkip()),
+               dal.Limit(query.GetPageSizeOr(10000)),
+       )
+       tasks := make([]*models.Task, 0)
+       err = db.All(&tasks, clauses...)
        if err != nil {
-               return nil, count, errors.Convert(err)
+               return nil, count, err
        }
 
+       // fill running information
        runningTasks.FillProgressDetailToTasks(tasks)
 
        return tasks, count, nil
 }
 
 // GetTasksWithLastStatus returns task list of the pipeline, only the most 
recently tasks would be returned
-func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
-       var tasks []models.Task
-       dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id 
= ?", pipelineId)
-       err := dbInner.Find(&tasks).Error
+func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) {
+       var tasks []*models.Task
+       err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), 
dal.Orderby("id DESC"))
        if err != nil {
-               return nil, errors.Convert(err)
+               return nil, err
        }
        taskIds := make(map[int64]struct{})
-       var result []models.Task
+       var result []*models.Task
        var maxRow, maxCol int
        for _, task := range tasks {
                if task.PipelineRow > maxRow {
@@ -225,35 +149,12 @@ func GetTasksWithLastStatus(pipelineId uint64) 
([]models.Task, errors.Error) {
        return result, nil
 }
 
-// SpawnTasks create new tasks from the failed ones
-func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
-       var result []models.Task
-       for _, task := range input {
-               task.Model = common.Model{}
-               task.Status = models.TASK_CREATED
-               task.Message = ""
-               task.Progress = 0
-               task.ProgressDetail = nil
-               task.FailedSubTask = ""
-               task.BeganAt = nil
-               task.FinishedAt = nil
-               task.SpentSeconds = 0
-               result = append(result, task)
-       }
-       err := db.Save(&result).Error
-       if err != nil {
-               taskLog.Error(err, "save task failed")
-               return nil, errors.Internal.Wrap(err, "save task failed")
-       }
-       return result, nil
-}
-
 // GetTask FIXME ...
 func GetTask(taskId uint64) (*models.Task, errors.Error) {
        task := &models.Task{}
-       err := db.First(task, taskId).Error
+       err := db.First(task, dal.Where("id = ?", taskId))
        if err != nil {
-               if goerror.Is(err, gorm.ErrRecordNotFound) {
+               if db.IsErrorNotFound(err) {
                        return nil, errors.NotFound.New("task not found")
                }
                return nil, errors.Internal.Wrap(err, "error getting the task 
from database")
@@ -312,63 +213,15 @@ func RunTasksStandalone(parentLogger core.Logger, taskIds 
[]uint64) errors.Error
        return errors.Convert(err)
 }
 
-func runTaskStandalone(parentLog core.Logger, taskId uint64) errors.Error {
-       // deferring cleaning up
-       defer func() {
-               _, _ = runningTasks.Remove(taskId)
-       }()
-       // for task cancelling
-       ctx, cancel := context.WithCancel(context.Background())
-       err := runningTasks.Add(taskId, cancel)
+// RerunTask reruns specified task
+func RerunTask(taskId uint64) (*models.Task, errors.Error) {
+       task, err := GetTask(taskId)
        if err != nil {
-               return err
-       }
-       // now , create a progress update channel and kick off
-       progress := make(chan core.RunningProgress, 100)
-       go updateTaskProgress(taskId, progress)
-       err = runner.RunTask(
-               ctx,
-               runner.CreateBasicRes(cfg, parentLog, db),
-               progress,
-               taskId,
-       )
-       close(progress)
-       return err
-}
-
-func getRunningTaskById(taskId uint64) *RunningTaskData {
-       runningTasks.mu.Lock()
-       defer runningTasks.mu.Unlock()
-
-       return runningTasks.tasks[taskId]
-}
-
-func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
-       data := getRunningTaskById(taskId)
-       if data == nil {
-               return
+               return nil, err
        }
-       progressDetail := data.ProgressDetail
-       for p := range progress {
-               runningTasks.mu.Lock()
-               runner.UpdateProgressDetail(basicRes, taskId, progressDetail, 
&p)
-               runningTasks.mu.Unlock()
-       }
-}
-
-func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
-       submatches := activityPattern.FindStringSubmatch(activityId)
-       if len(submatches) < 2 {
-               return 0, errors.Default.New("activityId does not match")
-       }
-       return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
-}
-
-// DeleteCreatedTasks deletes tasks with status `TASK_CREATED`
-func DeleteCreatedTasks(pipelineId uint64) errors.Error {
-       err := db.Where("pipeline_id = ? AND status = ?", pipelineId, 
models.TASK_CREATED).Delete(&models.Task{}).Error
+       rerunTasks, err := RerunPipeline(task.PipelineId, task)
        if err != nil {
-               return errors.Convert(err)
+               return nil, err
        }
-       return nil
+       return rerunTasks[0], nil
 }
diff --git a/services/task_runner.go b/services/task_runner.go
new file mode 100644
index 000000000..373734d81
--- /dev/null
+++ b/services/task_runner.go
@@ -0,0 +1,167 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+       "context"
+       "fmt"
+       "strconv"
+       "sync"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/models"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/runner"
+)
+
+// RunningTaskData FIXME ...
+type RunningTaskData struct {
+       Cancel         context.CancelFunc
+       ProgressDetail *models.TaskProgressDetail
+}
+
+// RunningTask FIXME ...
+type RunningTask struct {
+       mu    sync.Mutex
+       tasks map[uint64]*RunningTaskData
+}
+
+// Add FIXME ...
+func (rt *RunningTask) Add(taskId uint64, cancel context.CancelFunc) 
errors.Error {
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+       if _, ok := rt.tasks[taskId]; ok {
+               return errors.Default.New(fmt.Sprintf("task with id %d already 
running", taskId))
+       }
+       rt.tasks[taskId] = &RunningTaskData{
+               Cancel:         cancel,
+               ProgressDetail: &models.TaskProgressDetail{},
+       }
+       return nil
+}
+
+func (rt *RunningTask) setAll(progressDetails 
map[uint64]*models.TaskProgressDetail) {
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+       // delete finished tasks
+       for taskId := range rt.tasks {
+               if _, ok := progressDetails[taskId]; !ok {
+                       delete(rt.tasks, taskId)
+               }
+       }
+       for taskId, progressDetail := range progressDetails {
+               if _, ok := rt.tasks[taskId]; !ok {
+                       rt.tasks[taskId] = &RunningTaskData{}
+               }
+               rt.tasks[taskId].ProgressDetail = progressDetail
+       }
+}
+
+// FillProgressDetailToTasks lock less times than GetProgressDetail
+func (rt *RunningTask) FillProgressDetailToTasks(tasks []*models.Task) {
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+
+       for index, task := range tasks {
+               taskId := task.ID
+               if task, ok := rt.tasks[taskId]; ok {
+                       tasks[index].ProgressDetail = task.ProgressDetail
+               }
+       }
+}
+
+// GetProgressDetail FIXME ...
+func (rt *RunningTask) GetProgressDetail(taskId uint64) 
*models.TaskProgressDetail {
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+
+       if task, ok := rt.tasks[taskId]; ok {
+               return task.ProgressDetail
+       }
+       return nil
+}
+
+// Remove FIXME ...
+func (rt *RunningTask) Remove(taskId uint64) (context.CancelFunc, 
errors.Error) {
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+       if d, ok := rt.tasks[taskId]; ok {
+               delete(rt.tasks, taskId)
+               return d.Cancel, nil
+       }
+       return nil, errors.NotFound.New(fmt.Sprintf("task with id %d not 
found", taskId))
+}
+
+var runningTasks RunningTask
+
+func init() {
+       // set all previous unfinished tasks to status failed
+       runningTasks.tasks = make(map[uint64]*RunningTaskData)
+}
+
+func runTaskStandalone(parentLog core.Logger, taskId uint64) errors.Error {
+       // deferring cleaning up
+       defer func() {
+               _, _ = runningTasks.Remove(taskId)
+       }()
+       // for task cancelling
+       ctx, cancel := context.WithCancel(context.Background())
+       err := runningTasks.Add(taskId, cancel)
+       if err != nil {
+               return err
+       }
+       // now , create a progress update channel and kick off
+       progress := make(chan core.RunningProgress, 100)
+       go updateTaskProgress(taskId, progress)
+       err = runner.RunTask(
+               ctx,
+               basicRes.ReplaceLogger(parentLog),
+               progress,
+               taskId,
+       )
+       close(progress)
+       return err
+}
+
+func getRunningTaskById(taskId uint64) *RunningTaskData {
+       runningTasks.mu.Lock()
+       defer runningTasks.mu.Unlock()
+
+       return runningTasks.tasks[taskId]
+}
+
+func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
+       data := getRunningTaskById(taskId)
+       if data == nil {
+               return
+       }
+       progressDetail := data.ProgressDetail
+       for p := range progress {
+               runningTasks.mu.Lock()
+               runner.UpdateProgressDetail(basicRes, taskId, progressDetail, 
&p)
+               runningTasks.mu.Unlock()
+       }
+}
+
+func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
+       submatches := activityPattern.FindStringSubmatch(activityId)
+       if len(submatches) < 2 {
+               return 0, errors.Default.New("activityId does not match")
+       }
+       return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
+}

Reply via email to