likyh commented on code in PR #3680:
URL:
https://github.com/apache/incubator-devlake/pull/3680#discussion_r1016371709
##########
api/task/task.go:
##########
@@ -23,67 +23,120 @@ import (
"github.com/apache/incubator-devlake/api/shared"
"github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/services"
"github.com/gin-gonic/gin"
)
-/*
-Get list of pipelines
-GET
/pipelines/pipeline:id/tasks?status=TASK_RUNNING&pending=1&page=1&=pagesize=10
-{
- "tasks": [
- {"id": 1, "plugin": "", ...}
- ],
- "count": 5
+func Delete(c *gin.Context) {
+ taskId := c.Param("taskId")
+ id, err := strconv.ParseUint(taskId, 10, 64)
+ if err != nil {
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
task ID format"))
+ return
+ }
+ err = services.CancelTask(id)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error
cancelling task"))
+ return
+ }
+ shared.ApiOutputSuccess(c, nil, http.StatusOK)
}
-*/
-// @Summary Get task
-// @Description get task
-// @Description SAMPLE
-// @Description {
-// @Description "tasks": [
-// @Description {"id": 1, "plugin": "", ...}
-// @Description ],
-// @Description "count": 5
-// @Description }
+
+type getTaskResponse struct {
+ Tasks []models.Task `json:"tasks"`
+ Count int `json:"count"`
+}
+
+// GetTaskByPipeline return most recent tasks
+// @Summary Get tasks, only the most recent tasks will be returned
// @Tags framework/task
// @Accept application/json
// @Param pipelineId path int true "pipelineId"
-// @Success 200 {string} gin.H "{"tasks": tasks, "count": count}"
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Success 200 {object} getTaskResponse
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /pipelines/{pipelineId}/tasks [get]
-func Index(c *gin.Context) {
- var query services.TaskQuery
- err := c.ShouldBindQuery(&query)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err,
shared.BadRequestBody))
- return
- }
- err = c.ShouldBindUri(&query)
+func GetTaskByPipeline(c *gin.Context) {
+ pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request
URI format"))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
pipeline ID format"))
return
}
- tasks, count, err := services.GetTasks(&query)
+ tasks, err := services.GetTasksWithLastStatus(pipelineId)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
getting tasks"))
return
}
- shared.ApiOutputSuccess(c, gin.H{"tasks": tasks, "count": count},
http.StatusOK)
+ shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count:
len(tasks)}, http.StatusOK)
}
-func Delete(c *gin.Context) {
- taskId := c.Param("taskId")
- id, err := strconv.ParseUint(taskId, 10, 64)
+type rerunRequest struct {
+ TaskId uint64 `json:"taskId"`
+}
+
+// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of
this pipeline will rerun
+// @Summary rerun tasks
+// @Tags framework/task
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Param request body rerunRequest false "specify the task to rerun. If it's
0, all failed tasks of this pipeline will rerun"
+// @Success 200 {object} shared.ApiBody
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
+// @Router /pipelines/{pipelineId}/tasks [post]
+func RerunTask(c *gin.Context) {
+ var request rerunRequest
+ err := c.BindJSON(&request)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
task ID format"))
return
}
- err = services.CancelTask(id)
+ pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error
cancelling task"))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid
pipeline ID format"))
return
}
+ pipeline, err := services.GetPipeline(pipelineId)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error get
pipeline"))
+ return
+ }
+ var taskIds []uint64
+ var failedTasks []models.Task
+ if request.TaskId > 0 {
+ failedTask, err := services.GetTask(request.TaskId)
+ if err != nil || failedTask == nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err,
"error getting failed task"))
+ return
+ }
+ failedTasks = append(failedTasks, *failedTask)
+ } else {
+ tasks, err := services.GetTasksWithLastStatus(pipelineId)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err,
"error getting tasks"))
+ return
+ }
+ for _, task := range tasks {
+ if task.Status == models.TASK_FAILED {
+ failedTasks = append(failedTasks, task)
+ }
+ }
+ }
+ if len(failedTasks) == 0 {
+ shared.ApiOutputSuccess(c, nil, http.StatusOK)
+ return
+ }
+ newTasks, err := services.SpawnTasks(failedTasks)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error create
tasks"))
+ return
+ }
+ for _, task := range newTasks {
+ taskIds = append(taskIds, task.ID)
+ }
+ go func() {
+ _ =
services.RunTasksStandalone(services.GetPipelineLogger(pipeline), taskIds)
Review Comment:
If run here direct, it will break running control by `PIPELINE_MAX_PARALLEL`
and all tasks will run parallel.
##########
services/task.go:
##########
@@ -194,6 +196,52 @@ func GetTasks(query *TaskQuery) ([]models.Task, int64,
errors.Error) {
return tasks, count, nil
}
+// GetTasksWithLastStatus returns task list of the pipeline, only the most
recently tasks would be returned
+func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
+ var tasks []models.Task
+ dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id
= ?", pipelineId)
+ err := dbInner.Find(&tasks).Error
+ if err != nil {
+ return nil, errors.Convert(err)
+ }
+ taskIds := make(map[int64]struct{})
+ var result []models.Task
+ var maxRow, maxCol int
+ for _, task := range tasks {
+ if task.PipelineRow > maxRow {
+ maxRow = task.PipelineRow
+ }
+ if task.PipelineCol > maxCol {
+ maxCol = task.PipelineCol
+ }
+ }
+ for _, task := range tasks {
+ index := int64(task.PipelineRow)*int64(maxCol) +
int64(task.PipelineCol)
+ if _, ok := taskIds[index]; !ok {
+ taskIds[index] = struct{}{}
+ result = append(result, task)
+ }
+ }
+ runningTasks.FillProgressDetailToTasks(result)
+ return result, nil
+}
+
+// SpawnTasks create new tasks from the failed ones
+func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
+ var result []models.Task
+ for _, task := range input {
+ task.Model = common.Model{}
Review Comment:
Just pick the necessary column here? It seems that some unnecessary columns
like `FinishedAt` have been copied to new task
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]