This is an automated email from the ASF dual-hosted git repository. abeizn pushed a commit to branch feat#6088 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 01505fc74dd94545c084d118130454851301adcf Author: abeizn <[email protected]> AuthorDate: Tue Nov 14 16:40:05 2023 +0800 feat: add skip collector at advanced mode when subtask not nil --- backend/server/services/blueprint.go | 22 ++++++++- backend/server/services/blueprint_makeplan_v200.go | 14 +----- backend/server/services/blueprint_test.go | 55 ++++++++++++++++++++++ 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index 34eef95d9..7363f229a 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -285,7 +285,11 @@ func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy *models.S return nil, err } } else { - plan = blueprint.Plan + if syncPolicy != nil && syncPolicy.SkipCollectors { + plan = removeCollectorTasks(blueprint.Plan) + } else { + plan = blueprint.Plan + } } newPipeline := models.NewPipeline{} @@ -388,3 +392,19 @@ func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy) (*models.Pipelin return createPipelineByBlueprint(blueprint, syncPolicy) } + +func removeCollectorTasks(plan models.PipelinePlan) models.PipelinePlan { + for j, stage := range plan { + for k, task := range stage { + newSubtasks := make([]string, 0, len(task.Subtasks)) + for _, subtask := range task.Subtasks { + if !strings.Contains(strings.ToLower(subtask), "collect") { + newSubtasks = append(newSubtasks, subtask) + } + } + task.Subtasks = newSubtasks + plan[j][k] = task + } + } + return plan +} diff --git a/backend/server/services/blueprint_makeplan_v200.go b/backend/server/services/blueprint_makeplan_v200.go index 69e11bfc2..cc9a2d30a 100644 --- a/backend/server/services/blueprint_makeplan_v200.go +++ b/backend/server/services/blueprint_makeplan_v200.go @@ -20,7 +20,6 @@ package services import ( "encoding/json" "fmt" - "strings" "github.com/apache/incubator-devlake/core/errors" coreModels "github.com/apache/incubator-devlake/core/models" @@ -73,18 +72,7 @@ func GeneratePlanJsonV200( // skip collectors if skipCollectors { for i, plan := range sourcePlans { - for j, stage := range plan { - for k, task := range stage { - newSubtasks := make([]string, 0, len(task.Subtasks)) - for _, subtask := range task.Subtasks { - if !strings.Contains(strings.ToLower(subtask), "collect") { - newSubtasks = append(newSubtasks, subtask) - } - } - task.Subtasks = newSubtasks - sourcePlans[i][j][k] = task - } - } + sourcePlans[i] = removeCollectorTasks(plan) } // remove gitextractor plugin if it's not the only task diff --git a/backend/server/services/blueprint_test.go b/backend/server/services/blueprint_test.go index 3a4fc95ff..f0a607047 100644 --- a/backend/server/services/blueprint_test.go +++ b/backend/server/services/blueprint_test.go @@ -92,3 +92,58 @@ func TestParallelizePipelineTasks(t *testing.T) { ParallelizePipelinePlans(plan1, plan2, plan3), ) } + +func TestRemoveCollectorTasks(t *testing.T) { + plan1 := coreModels.PipelinePlan{ + { + { + Plugin: "github", + Subtasks: []string{ + "CollectApiPipelines", + "ExtractApiPipelines", + "collectApiPipelineDetails", + "extractApiPipelineDetails", + "collectApiJobs", + "extractApiJobs", + "collectAccounts", + "extractAccounts", + "ConvertAccounts", + "convertApiProject", + "convertPipelines", + "convertPipelineCommits", + "convertJobs", + }, + }, + }, + { + { + Plugin: "starrocks", + Subtasks: []string{}, + }, + }, + } + assert.Equal(t, coreModels.PipelinePlan{ + { + { + Plugin: "github", + Subtasks: []string{ + "ExtractApiPipelines", + "extractApiPipelineDetails", + "extractApiJobs", + "extractAccounts", + "ConvertAccounts", + "convertApiProject", + "convertPipelines", + "convertPipelineCommits", + "convertJobs", + }, + }, + }, + { + { + Plugin: "starrocks", + Subtasks: []string{}, + }, + }, + }, removeCollectorTasks(plan1)) +}
