likyh commented on code in PR #3680:
URL: 
https://github.com/apache/incubator-devlake/pull/3680#discussion_r1016371709


##########
api/task/task.go:
##########
@@ -23,67 +23,120 @@ import (
 
        "github.com/apache/incubator-devlake/api/shared"
        "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/services"
        "github.com/gin-gonic/gin"
 )
 
-/*
-Get list of pipelines
-GET 
/pipelines/pipeline:id/tasks?status=TASK_RUNNING&pending=1&page=1&=pagesize=10
-{
-       "tasks": [
-               {"id": 1, "plugin": "", ...}
-       ],
-       "count": 5
+func Delete(c *gin.Context) {
+       taskId := c.Param("taskId")
+       id, err := strconv.ParseUint(taskId, 10, 64)
+       if err != nil {
+               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
task ID format"))
+               return
+       }
+       err = services.CancelTask(id)
+       if err != nil {
+               shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
cancelling task"))
+               return
+       }
+       shared.ApiOutputSuccess(c, nil, http.StatusOK)
 }
-*/
-// @Summary Get task
-// @Description get task
-// @Description SAMPLE
-// @Description {
-// @Description        "tasks": [
-// @Description                {"id": 1, "plugin": "", ...}
-// @Description        ],
-// @Description        "count": 5
-// @Description }
+
+type getTaskResponse struct {
+       Tasks []models.Task `json:"tasks"`
+       Count int           `json:"count"`
+}
+
+// GetTaskByPipeline return most recent tasks
+// @Summary Get tasks, only the most recent tasks will be returned
 // @Tags framework/task
 // @Accept application/json
 // @Param pipelineId path int true "pipelineId"
-// @Success 200  {string} gin.H "{"tasks": tasks, "count": count}"
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Success 200  {object} getTaskResponse
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /pipelines/{pipelineId}/tasks [get]
-func Index(c *gin.Context) {
-       var query services.TaskQuery
-       err := c.ShouldBindQuery(&query)
-       if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, 
shared.BadRequestBody))
-               return
-       }
-       err = c.ShouldBindUri(&query)
+func GetTaskByPipeline(c *gin.Context) {
+       pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
        if err != nil {
-               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request 
URI format"))
+               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
pipeline ID format"))
                return
        }
-       tasks, count, err := services.GetTasks(&query)
+       tasks, err := services.GetTasksWithLastStatus(pipelineId)
        if err != nil {
                shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
getting tasks"))
                return
        }
-       shared.ApiOutputSuccess(c, gin.H{"tasks": tasks, "count": count}, 
http.StatusOK)
+       shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count: 
len(tasks)}, http.StatusOK)
 }
 
-func Delete(c *gin.Context) {
-       taskId := c.Param("taskId")
-       id, err := strconv.ParseUint(taskId, 10, 64)
+type rerunRequest struct {
+       TaskId uint64 `json:"taskId"`
+}
+
+// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of 
this pipeline will rerun
+// @Summary rerun tasks
+// @Tags framework/task
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Param request body rerunRequest false "specify the task to rerun. If it's 
0, all failed tasks of this pipeline will rerun"
+// @Success 200  {object} shared.ApiBody
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /pipelines/{pipelineId}/tasks [post]
+func RerunTask(c *gin.Context) {
+       var request rerunRequest
+       err := c.BindJSON(&request)
        if err != nil {
                shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
task ID format"))
                return
        }
-       err = services.CancelTask(id)
+       pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
        if err != nil {
-               shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
cancelling task"))
+               shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid 
pipeline ID format"))
                return
        }
+       pipeline, err := services.GetPipeline(pipelineId)
+       if err != nil {
+               shared.ApiOutputError(c, errors.Default.Wrap(err, "error get 
pipeline"))
+               return
+       }
+       var taskIds []uint64
+       var failedTasks []models.Task
+       if request.TaskId > 0 {
+               failedTask, err := services.GetTask(request.TaskId)
+               if err != nil || failedTask == nil {
+                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
"error getting failed task"))
+                       return
+               }
+               failedTasks = append(failedTasks, *failedTask)
+       } else {
+               tasks, err := services.GetTasksWithLastStatus(pipelineId)
+               if err != nil {
+                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
"error getting tasks"))
+                       return
+               }
+               for _, task := range tasks {
+                       if task.Status == models.TASK_FAILED {
+                               failedTasks = append(failedTasks, task)
+                       }
+               }
+       }
+       if len(failedTasks) == 0 {
+               shared.ApiOutputSuccess(c, nil, http.StatusOK)
+               return
+       }
+       newTasks, err := services.SpawnTasks(failedTasks)
+       if err != nil {
+               shared.ApiOutputError(c, errors.Default.Wrap(err, "error create 
tasks"))
+               return
+       }
+       for _, task := range newTasks {
+               taskIds = append(taskIds, task.ID)
+       }
+       go func() {
+               _ = 
services.RunTasksStandalone(services.GetPipelineLogger(pipeline), taskIds)

Review Comment:
   If run here direct, it will break running control by `PIPELINE_MAX_PARALLEL` 
and all tasks will run parallel.



##########
services/task.go:
##########
@@ -194,6 +196,52 @@ func GetTasks(query *TaskQuery) ([]models.Task, int64, 
errors.Error) {
        return tasks, count, nil
 }
 
+// GetTasksWithLastStatus returns task list of the pipeline, only the most 
recently tasks would be returned
+func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
+       var tasks []models.Task
+       dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id 
= ?", pipelineId)
+       err := dbInner.Find(&tasks).Error
+       if err != nil {
+               return nil, errors.Convert(err)
+       }
+       taskIds := make(map[int64]struct{})
+       var result []models.Task
+       var maxRow, maxCol int
+       for _, task := range tasks {
+               if task.PipelineRow > maxRow {
+                       maxRow = task.PipelineRow
+               }
+               if task.PipelineCol > maxCol {
+                       maxCol = task.PipelineCol
+               }
+       }
+       for _, task := range tasks {
+               index := int64(task.PipelineRow)*int64(maxCol) + 
int64(task.PipelineCol)
+               if _, ok := taskIds[index]; !ok {
+                       taskIds[index] = struct{}{}
+                       result = append(result, task)
+               }
+       }
+       runningTasks.FillProgressDetailToTasks(result)
+       return result, nil
+}
+
+// SpawnTasks create new tasks from the failed ones
+func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
+       var result []models.Task
+       for _, task := range input {
+               task.Model = common.Model{}

Review Comment:
   Just pick the necessary column here? It seems that some unnecessary columns 
like `FinishedAt`  have been copied to new task



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to