This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new ac3b1d9cb feat: jira epic collection / extraction support incremental 
mode for better performance (#8414)
ac3b1d9cb is described below

commit ac3b1d9cb317e6a08b1412ddef0de7bbb9b97c9e
Author: Klesh Wong <zhenmian.hu...@merico.dev>
AuthorDate: Tue Apr 29 16:04:12 2025 +0800

    feat: jira epic collection / extraction support incremental mode for better 
performance (#8414)
---
 backend/core/config/config_viper.go          |  1 +
 backend/plugins/jira/tasks/epic_collector.go |  9 ++++--
 backend/plugins/jira/tasks/epic_extractor.go | 47 +++++++++++++++++++---------
 backend/server/api/api.go                    |  1 +
 backend/server/services/pipeline.go          |  7 +++--
 5 files changed, 45 insertions(+), 20 deletions(-)

diff --git a/backend/core/config/config_viper.go 
b/backend/core/config/config_viper.go
index 16060c46b..571439e47 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -106,6 +106,7 @@ func setDefaultValue(v *viper.Viper) {
        v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
        v.SetDefault("RESUME_PIPELINES", true)
        v.SetDefault("CORS_ALLOW_ORIGIN", "*")
+       v.SetDefault("CONSUME_PIPELINES", true)
 }
 
 func init() {
diff --git a/backend/plugins/jira/tasks/epic_collector.go 
b/backend/plugins/jira/tasks/epic_collector.go
index 959b0e3c8..dcb163309 100644
--- a/backend/plugins/jira/tasks/epic_collector.go
+++ b/backend/plugins/jira/tasks/epic_collector.go
@@ -78,6 +78,9 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error 
{
                logger.Info("got user's timezone: %v", loc.String())
        }
        jql := "ORDER BY created ASC"
+       if apiCollector.GetSince() != nil {
+               jql = buildJQL(*apiCollector.GetSince(), loc)
+       }
 
        err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
@@ -90,7 +93,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error 
{
                        for _, e := range reqData.Input.([]interface{}) {
                                epicKeys = append(epicKeys, *e.(*string))
                        }
-                       localJQL := fmt.Sprintf("issue in (%s) %s", 
strings.Join(epicKeys, ","), jql)
+                       localJQL := fmt.Sprintf("issue in (%s) and %s", 
strings.Join(epicKeys, ","), jql)
                        query.Set("jql", localJQL)
                        query.Set("startAt", fmt.Sprintf("%v", 
reqData.Pager.Skip))
                        query.Set("maxResults", fmt.Sprintf("%v", 
reqData.Pager.Size))
@@ -130,12 +133,12 @@ func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, 
batchSize int) (api.Ite
                dal.Join(`
                        LEFT JOIN _tool_jira_board_issues bi ON (
                        i.connection_id = bi.connection_id
-                       AND 
+                       AND
                        i.issue_id = bi.issue_id
                )`),
                dal.Where(`
                        i.connection_id = ?
-                       AND 
+                       AND
                        bi.board_id = ?
                        AND
                        i.epic_key != ''
diff --git a/backend/plugins/jira/tasks/epic_extractor.go 
b/backend/plugins/jira/tasks/epic_extractor.go
index 89f2ef70c..fa7a4d741 100644
--- a/backend/plugins/jira/tasks/epic_extractor.go
+++ b/backend/plugins/jira/tasks/epic_extractor.go
@@ -18,8 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-       "encoding/json"
-
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/log"
@@ -39,12 +37,12 @@ var ExtractEpicsMeta = plugin.SubTaskMeta{
        DomainTypes:      []string{plugin.DOMAIN_TYPE_TICKET, 
plugin.DOMAIN_TYPE_CROSS},
 }
 
-func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error {
-       data := taskCtx.GetData().(*JiraTaskData)
-       db := taskCtx.GetDal()
+func ExtractEpics(subtaskCtx plugin.SubTaskContext) errors.Error {
+       data := subtaskCtx.GetData().(*JiraTaskData)
+       db := subtaskCtx.GetDal()
        connectionId := data.Options.ConnectionId
        boardId := data.Options.BoardId
-       logger := taskCtx.GetLogger()
+       logger := subtaskCtx.GetLogger()
        logger.Info("extract external epic Issues, connection_id=%d, 
board_id=%d", connectionId, boardId)
        mappings, err := getTypeMappings(data, db)
        if err != nil {
@@ -54,21 +52,40 @@ func ExtractEpics(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
-               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
-                       Ctx: taskCtx,
+
+       extractor, err := 
api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{
+               SubtaskCommonArgs: &api.SubtaskCommonArgs{
+                       SubTaskContext: subtaskCtx,
+                       Table:          RAW_EPIC_TABLE,
                        Params: JiraApiParams{
                                ConnectionId: data.Options.ConnectionId,
                                BoardId:      data.Options.BoardId,
                        },
-                       Table: RAW_EPIC_TABLE,
+                       SubtaskConfig: map[string]any{
+                               "typeMappings":    mappings,
+                               "storyPointField": 
data.Options.ScopeConfig.StoryPointField,
+                       },
                },
-               Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
-                       apiIssue := &apiv2models.Issue{}
-                       err = errors.Convert(json.Unmarshal(row.Data, apiIssue))
-                       if err != nil {
-                               return nil, err
+               BeforeExtract: func(apiIssue *apiv2models.Issue, stateManager 
*api.SubtaskStateManager) errors.Error {
+                       if stateManager.IsIncremental() {
+                               err := db.Delete(
+                                       &models.JiraIssueLabel{},
+                                       dal.Where("connection_id = ? AND 
issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
+                               )
+                               if err != nil {
+                                       return err
+                               }
+                               err = db.Delete(
+                                       &models.JiraIssueRelationship{},
+                                       dal.Where("connection_id = ? AND 
issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
+                               )
+                               if err != nil {
+                                       return err
+                               }
                        }
+                       return nil
+               },
+               Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) 
([]interface{}, errors.Error) {
                        return extractIssues(data, mappings, apiIssue, row, 
userFieldMap)
                },
        })
diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index a2b590be4..a407cdce4 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -118,6 +118,7 @@ func SetupApiServer(router *gin.Engine) {
        // Required for `/projects/hello%20%2F%20world` to be parsed properly 
with `/projects/:projectName`
        // end up with `name = "hello / world"`
        router.UseRawPath = true
+       // router.UnescapePathValues = false
 
        // Endpoint to proceed database migration
        router.GET("/proceed-db-migration", func(ctx *gin.Context) {
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index f0c7f3133..41e8d8c28 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -20,7 +20,6 @@ package services
 import (
        "context"
        "fmt"
-       "golang.org/x/sync/errgroup"
        "net/url"
        "os"
        "path/filepath"
@@ -28,6 +27,8 @@ import (
        "sync"
        "time"
 
+       "golang.org/x/sync/errgroup"
+
        "github.com/spf13/cast"
 
        "github.com/apache/incubator-devlake/core/dal"
@@ -107,7 +108,9 @@ func pipelineServiceInit() {
                pipelineMaxParallel = 10000
        }
        // run pipeline with independent goroutine
-       go RunPipelineInQueue(pipelineMaxParallel)
+       if cfg.GetBool("CONSUME_PIPELINES") {
+               go RunPipelineInQueue(pipelineMaxParallel)
+       }
 }
 
 func markInterruptedPipelineAs(status string) {

Reply via email to