This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a507b06 fix: improve jira changelog and worklog collecting (#1960)
0a507b06 is described below

commit 0a507b06544679a5929153e6ba63ac4cabe3eded
Author: mindlesscloud <[email protected]>
AuthorDate: Fri May 20 22:35:26 2022 +0800

    fix: improve jira changelog and worklog collecting (#1960)
    
    * fix: improve jira changelog and worklog collecting
    
    * fix: fix missing isremoved in
---
 plugins/jira/jira.go                               |  7 ++++-
 plugins/jira/models/issue.go                       |  1 +
 .../update_schemas20220518.go}                     | 34 ++++++++++++++++++----
 plugins/jira/tasks/changelog_collector.go          | 15 +---------
 plugins/jira/tasks/sprint_issues_convertor.go      | 10 ++++---
 plugins/jira/tasks/worklog_collector.go            | 26 ++++-------------
 plugins/jira/tasks/worklog_extractor.go            | 14 ++++++++-
 7 files changed, 61 insertions(+), 46 deletions(-)

diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 31e82922..561b14d3 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -127,7 +127,12 @@ func (plugin Jira) RootPkgPath() string {
 }
 
 func (plugin Jira) MigrationScripts() []migration.Script {
-       return []migration.Script{new(migrationscripts.InitSchemas), 
new(migrationscripts.UpdateSchemas20220505), 
new(migrationscripts.UpdateSchemas20220507)}
+       return []migration.Script{
+               new(migrationscripts.InitSchemas),
+               new(migrationscripts.UpdateSchemas20220505),
+               new(migrationscripts.UpdateSchemas20220507),
+               new(migrationscripts.UpdateSchemas20220518),
+       }
 }
 
 func (plugin Jira) ApiResources() 
map[string]map[string]core.ApiResourceHandler {
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index 97cfaf1e..ceb960a3 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -49,6 +49,7 @@ type JiraIssue struct {
        // internal status tracking
        ChangelogUpdated  *time.Time
        RemotelinkUpdated *time.Time
+       WorklogUpdated    *time.Time
        common.NoPKModel
 }
 
diff --git a/plugins/jira/models/issue.go 
b/plugins/jira/models/migrationscripts/update_schemas20220518.go
similarity index 74%
copy from plugins/jira/models/issue.go
copy to plugins/jira/models/migrationscripts/update_schemas20220518.go
index 97cfaf1e..4e947bb7 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/migrationscripts/update_schemas20220518.go
@@ -1,15 +1,18 @@
-package models
+package migrationscripts
 
 import (
+       "context"
        "time"
 
-       "github.com/merico-dev/lake/models/common"
        "gorm.io/datatypes"
+       "gorm.io/gorm"
+
+       "github.com/merico-dev/lake/models/migrationscripts/archived"
 )
 
-type JiraIssue struct {
+type JiraIssue20220518 struct {
        // collected fields
-       ConnectionId             uint64 `gorm:"primaryKey"`
+       SourceId                 uint64 `gorm:"primaryKey"`
        IssueId                  uint64 `gorm:"primarykey"`
        ProjectId                uint64
        Self                     string `gorm:"type:varchar(255)"`
@@ -49,9 +52,28 @@ type JiraIssue struct {
        // internal status tracking
        ChangelogUpdated  *time.Time
        RemotelinkUpdated *time.Time
-       common.NoPKModel
+       WorklogUpdated    *time.Time
+       archived.NoPKModel
 }
 
-func (JiraIssue) TableName() string {
+func (JiraIssue20220518) TableName() string {
        return "_tool_jira_issues"
 }
+
+type UpdateSchemas20220518 struct{}
+
+func (*UpdateSchemas20220518) Up(ctx context.Context, db *gorm.DB) error {
+       err := db.Migrator().AddColumn(&JiraIssue20220518{}, "worklog_updated")
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (*UpdateSchemas20220518) Version() uint64 {
+       return 20220518132510
+}
+
+func (*UpdateSchemas20220518) Name() string {
+       return "Add worklog_updated column to JiraIssue"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go 
b/plugins/jira/tasks/changelog_collector.go
index d6f265d2..9ef753dc 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,19 +25,6 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
        db := taskCtx.GetDb()
        // figure out the time range
        since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.JiraChangelog
-               err := db.Where("connection_id = ?", 
data.Connection.ID).Order("created DESC").Limit(1).Find(&latestUpdated).Error
-               if err != nil {
-                       return fmt.Errorf("failed to get latest jira changelog 
record: %w", err)
-               }
-               if latestUpdated.ChangelogId > 0 {
-                       since = &latestUpdated.Created
-                       incremental = true
-               }
-       }
 
        // filter out issue_ids that needed collection
        tx := db.Table("_tool_jira_board_issues bi").
@@ -73,7 +60,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
                },
                ApiClient:   data.ApiClient,
                PageSize:    50,
-               Incremental: incremental,
+               Incremental: true,
                Input:       iterator,
                UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
                Query: func(reqData *helper.RequestData) (url.Values, error) {
diff --git a/plugins/jira/tasks/sprint_issues_convertor.go 
b/plugins/jira/tasks/sprint_issues_convertor.go
index d2ae823c..e422780c 100644
--- a/plugins/jira/tasks/sprint_issues_convertor.go
+++ b/plugins/jira/tasks/sprint_issues_convertor.go
@@ -232,10 +232,12 @@ func (c *SprintIssuesConverter) handleTo(connectionId, 
sprintId uint64, cl Chang
                return nil
        }
        if item, ok := c.sprintIssue[key]; ok {
-               if item != nil && (item.AddedDate == nil || item.AddedDate != 
nil && item.AddedDate.After(cl.Created)) {
-                       item.AddedDate = &cl.Created
-                       item.AddedStage = addedStage
+               if item != nil {
                        item.IsRemoved = false
+                       if item.AddedDate == nil || 
item.AddedDate.After(cl.Created) {
+                               item.AddedDate = &cl.Created
+                               item.AddedStage = addedStage
+                       }
                }
        } else {
                c.sprintIssue[key] = &ticket.SprintIssue{
@@ -336,7 +338,7 @@ func (c *SprintIssuesConverter) getJiraIssue(connectionId, 
issueId uint64) (*mod
 }
 
 func (c *SprintIssuesConverter) getStage(t time.Time, connectionId, sprintId 
uint64) (*string, error) {
-       sprint, err := c.getJiraSprint(sprintId, connectionId)
+       sprint, err := c.getJiraSprint(connectionId, sprintId)
        if err != nil {
                return nil, err
        }
diff --git a/plugins/jira/tasks/worklog_collector.go 
b/plugins/jira/tasks/worklog_collector.go
index 033f448f..7f5bed64 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -2,13 +2,11 @@ package tasks
 
 import (
        "encoding/json"
-       "fmt"
        "net/http"
        "reflect"
 
        "github.com/merico-dev/lake/plugins/core"
        "github.com/merico-dev/lake/plugins/helper"
-       "github.com/merico-dev/lake/plugins/jira/models"
        "github.com/merico-dev/lake/plugins/jira/tasks/apiv2models"
 )
 
@@ -18,29 +16,17 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
        db := taskCtx.GetDb()
        data := taskCtx.GetData().(*JiraTaskData)
        since := data.Since
-       incremental := false
-
-       if since == nil {
-               var latestUpdated models.JiraWorklog
-               err := db.Where("connection_id = ?", 
data.Connection.ID).Order("updated DESC").Limit(1).Find(&latestUpdated).Error
-               if err != nil {
-                       return fmt.Errorf("failed to get latest jira issue 
worklog record: %w", err)
-               }
-               if latestUpdated.IssueId > 0 {
-                       since = &latestUpdated.Updated
-                       incremental = true
-               }
-       }
 
        logger := taskCtx.GetLogger()
        connectionId := data.Connection.ID
        boardId := data.Options.BoardId
-       tx := db.Model(&models.JiraIssue{}).
-               Joins("left join _tool_jira_board_issues on 
_tool_jira_issues.issue_id = _tool_jira_board_issues.issue_id").
-               
Select("_tool_jira_board_issues.issue_id").Where("_tool_jira_board_issues.connection_id
 = ? AND _tool_jira_board_issues.board_id = ?", connectionId, boardId)
+       tx := db.Table("_tool_jira_board_issues bi").
+               Select("bi.issue_id, NOW() AS update_time").
+               Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)").
+               Where("bi.connection_id = ? AND bi.board_id = ? AND 
(i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, 
boardId)
 
        if since != nil {
-               tx = tx.Where("_tool_jira_issues.updated > ?", since)
+               tx = tx.Where("i.updated > ?", since)
        }
        cursor, err := tx.Rows()
        if err != nil {
@@ -64,7 +50,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
                ApiClient:     data.ApiClient,
                UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
                PageSize:      50,
-               Incremental:   incremental,
+               Incremental:   true,
                GetTotalPages: GetTotalPagesFromResponse,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
error) {
                        var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go 
b/plugins/jira/tasks/worklog_extractor.go
index 4f0820c7..e9b0bf13 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -2,6 +2,7 @@ package tasks
 
 import (
        "encoding/json"
+       "github.com/merico-dev/lake/plugins/jira/models"
 
        "github.com/merico-dev/lake/plugins/core"
        "github.com/merico-dev/lake/plugins/helper"
@@ -12,6 +13,7 @@ var _ core.SubTaskEntryPoint = ExtractWorklogs
 
 func ExtractWorklogs(taskCtx core.SubTaskContext) error {
        data := taskCtx.GetData().(*JiraTaskData)
+       db := taskCtx.GetDb()
        extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
                RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
                        Ctx: taskCtx,
@@ -22,8 +24,18 @@ func ExtractWorklogs(taskCtx core.SubTaskContext) error {
                        Table: RAW_WORKLOGS_TABLE,
                },
                Extract: func(row *helper.RawData) ([]interface{}, error) {
+                       var input apiv2models.Input
+                       err := json.Unmarshal(row.Input, &input)
+                       if err != nil {
+                               return nil, err
+                       }
+                       issue := &models.JiraIssue{ConnectionId: 
data.Connection.ID, IssueId: input.IssueId}
+                       err = db.Model(issue).Update("worklog_updated", 
input.UpdateTime).Error
+                       if err != nil {
+                               return nil, err
+                       }
                        var worklog apiv2models.Worklog
-                       err := json.Unmarshal(row.Data, &worklog)
+                       err = json.Unmarshal(row.Data, &worklog)
                        if err != nil {
                                return nil, err
                        }

Reply via email to