This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push: new ac3b1d9cb feat: jira epic collection / extraction support incremental mode for better performance (#8414) ac3b1d9cb is described below commit ac3b1d9cb317e6a08b1412ddef0de7bbb9b97c9e Author: Klesh Wong <zhenmian.hu...@merico.dev> AuthorDate: Tue Apr 29 16:04:12 2025 +0800 feat: jira epic collection / extraction support incremental mode for better performance (#8414) --- backend/core/config/config_viper.go | 1 + backend/plugins/jira/tasks/epic_collector.go | 9 ++++-- backend/plugins/jira/tasks/epic_extractor.go | 47 +++++++++++++++++++--------- backend/server/api/api.go | 1 + backend/server/services/pipeline.go | 7 +++-- 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/backend/core/config/config_viper.go b/backend/core/config/config_viper.go index 16060c46b..571439e47 100644 --- a/backend/core/config/config_viper.go +++ b/backend/core/config/config_viper.go @@ -106,6 +106,7 @@ func setDefaultValue(v *viper.Viper) { v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger") v.SetDefault("RESUME_PIPELINES", true) v.SetDefault("CORS_ALLOW_ORIGIN", "*") + v.SetDefault("CONSUME_PIPELINES", true) } func init() { diff --git a/backend/plugins/jira/tasks/epic_collector.go b/backend/plugins/jira/tasks/epic_collector.go index 959b0e3c8..dcb163309 100644 --- a/backend/plugins/jira/tasks/epic_collector.go +++ b/backend/plugins/jira/tasks/epic_collector.go @@ -78,6 +78,9 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error { logger.Info("got user's timezone: %v", loc.String()) } jql := "ORDER BY created ASC" + if apiCollector.GetSince() != nil { + jql = buildJQL(*apiCollector.GetSince(), loc) + } err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, @@ -90,7 +93,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error { for _, e := range reqData.Input.([]interface{}) { epicKeys = append(epicKeys, *e.(*string)) } - localJQL := fmt.Sprintf("issue in (%s) %s", strings.Join(epicKeys, ","), jql) + localJQL := fmt.Sprintf("issue in (%s) and %s", strings.Join(epicKeys, ","), jql) query.Set("jql", localJQL) query.Set("startAt", fmt.Sprintf("%v", reqData.Pager.Skip)) query.Set("maxResults", fmt.Sprintf("%v", reqData.Pager.Size)) @@ -130,12 +133,12 @@ func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int) (api.Ite dal.Join(` LEFT JOIN _tool_jira_board_issues bi ON ( i.connection_id = bi.connection_id - AND + AND i.issue_id = bi.issue_id )`), dal.Where(` i.connection_id = ? - AND + AND bi.board_id = ? AND i.epic_key != '' diff --git a/backend/plugins/jira/tasks/epic_extractor.go b/backend/plugins/jira/tasks/epic_extractor.go index 89f2ef70c..fa7a4d741 100644 --- a/backend/plugins/jira/tasks/epic_extractor.go +++ b/backend/plugins/jira/tasks/epic_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/log" @@ -39,12 +37,12 @@ var ExtractEpicsMeta = plugin.SubTaskMeta{ DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET, plugin.DOMAIN_TYPE_CROSS}, } -func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error { - data := taskCtx.GetData().(*JiraTaskData) - db := taskCtx.GetDal() +func ExtractEpics(subtaskCtx plugin.SubTaskContext) errors.Error { + data := subtaskCtx.GetData().(*JiraTaskData) + db := subtaskCtx.GetDal() connectionId := data.Options.ConnectionId boardId := data.Options.BoardId - logger := taskCtx.GetLogger() + logger := subtaskCtx.GetLogger() logger.Info("extract external epic Issues, connection_id=%d, board_id=%d", connectionId, boardId) mappings, err := getTypeMappings(data, db) if err != nil { @@ -54,21 +52,40 @@ func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: subtaskCtx, + Table: RAW_EPIC_TABLE, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, BoardId: data.Options.BoardId, }, - Table: RAW_EPIC_TABLE, + SubtaskConfig: map[string]any{ + "typeMappings": mappings, + "storyPointField": data.Options.ScopeConfig.StoryPointField, + }, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - apiIssue := &apiv2models.Issue{} - err = errors.Convert(json.Unmarshal(row.Data, apiIssue)) - if err != nil { - return nil, err + BeforeExtract: func(apiIssue *apiv2models.Issue, stateManager *api.SubtaskStateManager) errors.Error { + if stateManager.IsIncremental() { + err := db.Delete( + &models.JiraIssueLabel{}, + dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, apiIssue.ID), + ) + if err != nil { + return err + } + err = db.Delete( + &models.JiraIssueRelationship{}, + dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, apiIssue.ID), + ) + if err != nil { + return err + } } + return nil + }, + Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) ([]interface{}, errors.Error) { return extractIssues(data, mappings, apiIssue, row, userFieldMap) }, }) diff --git a/backend/server/api/api.go b/backend/server/api/api.go index a2b590be4..a407cdce4 100644 --- a/backend/server/api/api.go +++ b/backend/server/api/api.go @@ -118,6 +118,7 @@ func SetupApiServer(router *gin.Engine) { // Required for `/projects/hello%20%2F%20world` to be parsed properly with `/projects/:projectName` // end up with `name = "hello / world"` router.UseRawPath = true + // router.UnescapePathValues = false // Endpoint to proceed database migration router.GET("/proceed-db-migration", func(ctx *gin.Context) { diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index f0c7f3133..41e8d8c28 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -20,7 +20,6 @@ package services import ( "context" "fmt" - "golang.org/x/sync/errgroup" "net/url" "os" "path/filepath" @@ -28,6 +27,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/spf13/cast" "github.com/apache/incubator-devlake/core/dal" @@ -107,7 +108,9 @@ func pipelineServiceInit() { pipelineMaxParallel = 10000 } // run pipeline with independent goroutine - go RunPipelineInQueue(pipelineMaxParallel) + if cfg.GetBool("CONSUME_PIPELINES") { + go RunPipelineInQueue(pipelineMaxParallel) + } } func markInterruptedPipelineAs(status string) {