This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 0a507b06 fix: improve jira changelog and worklog collecting (#1960)
0a507b06 is described below
commit 0a507b06544679a5929153e6ba63ac4cabe3eded
Author: mindlesscloud <[email protected]>
AuthorDate: Fri May 20 22:35:26 2022 +0800
fix: improve jira changelog and worklog collecting (#1960)
* fix: improve jira changelog and worklog collecting
* fix: fix missing isremoved in
---
plugins/jira/jira.go | 7 ++++-
plugins/jira/models/issue.go | 1 +
.../update_schemas20220518.go} | 34 ++++++++++++++++++----
plugins/jira/tasks/changelog_collector.go | 15 +---------
plugins/jira/tasks/sprint_issues_convertor.go | 10 ++++---
plugins/jira/tasks/worklog_collector.go | 26 ++++-------------
plugins/jira/tasks/worklog_extractor.go | 14 ++++++++-
7 files changed, 61 insertions(+), 46 deletions(-)
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 31e82922..561b14d3 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -127,7 +127,12 @@ func (plugin Jira) RootPkgPath() string {
}
func (plugin Jira) MigrationScripts() []migration.Script {
- return []migration.Script{new(migrationscripts.InitSchemas),
new(migrationscripts.UpdateSchemas20220505),
new(migrationscripts.UpdateSchemas20220507)}
+ return []migration.Script{
+ new(migrationscripts.InitSchemas),
+ new(migrationscripts.UpdateSchemas20220505),
+ new(migrationscripts.UpdateSchemas20220507),
+ new(migrationscripts.UpdateSchemas20220518),
+ }
}
func (plugin Jira) ApiResources()
map[string]map[string]core.ApiResourceHandler {
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index 97cfaf1e..ceb960a3 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -49,6 +49,7 @@ type JiraIssue struct {
// internal status tracking
ChangelogUpdated *time.Time
RemotelinkUpdated *time.Time
+ WorklogUpdated *time.Time
common.NoPKModel
}
diff --git a/plugins/jira/models/issue.go
b/plugins/jira/models/migrationscripts/update_schemas20220518.go
similarity index 74%
copy from plugins/jira/models/issue.go
copy to plugins/jira/models/migrationscripts/update_schemas20220518.go
index 97cfaf1e..4e947bb7 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/migrationscripts/update_schemas20220518.go
@@ -1,15 +1,18 @@
-package models
+package migrationscripts
import (
+ "context"
"time"
- "github.com/merico-dev/lake/models/common"
"gorm.io/datatypes"
+ "gorm.io/gorm"
+
+ "github.com/merico-dev/lake/models/migrationscripts/archived"
)
-type JiraIssue struct {
+type JiraIssue20220518 struct {
// collected fields
- ConnectionId uint64 `gorm:"primaryKey"`
+ SourceId uint64 `gorm:"primaryKey"`
IssueId uint64 `gorm:"primarykey"`
ProjectId uint64
Self string `gorm:"type:varchar(255)"`
@@ -49,9 +52,28 @@ type JiraIssue struct {
// internal status tracking
ChangelogUpdated *time.Time
RemotelinkUpdated *time.Time
- common.NoPKModel
+ WorklogUpdated *time.Time
+ archived.NoPKModel
}
-func (JiraIssue) TableName() string {
+func (JiraIssue20220518) TableName() string {
return "_tool_jira_issues"
}
+
+type UpdateSchemas20220518 struct{}
+
+func (*UpdateSchemas20220518) Up(ctx context.Context, db *gorm.DB) error {
+ err := db.Migrator().AddColumn(&JiraIssue20220518{}, "worklog_updated")
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (*UpdateSchemas20220518) Version() uint64 {
+ return 20220518132510
+}
+
+func (*UpdateSchemas20220518) Name() string {
+ return "Add worklog_updated column to JiraIssue"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go
b/plugins/jira/tasks/changelog_collector.go
index d6f265d2..9ef753dc 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,19 +25,6 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
db := taskCtx.GetDb()
// figure out the time range
since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.JiraChangelog
- err := db.Where("connection_id = ?",
data.Connection.ID).Order("created DESC").Limit(1).Find(&latestUpdated).Error
- if err != nil {
- return fmt.Errorf("failed to get latest jira changelog
record: %w", err)
- }
- if latestUpdated.ChangelogId > 0 {
- since = &latestUpdated.Created
- incremental = true
- }
- }
// filter out issue_ids that needed collection
tx := db.Table("_tool_jira_board_issues bi").
@@ -73,7 +60,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
},
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: incremental,
+ Incremental: true,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
Query: func(reqData *helper.RequestData) (url.Values, error) {
diff --git a/plugins/jira/tasks/sprint_issues_convertor.go
b/plugins/jira/tasks/sprint_issues_convertor.go
index d2ae823c..e422780c 100644
--- a/plugins/jira/tasks/sprint_issues_convertor.go
+++ b/plugins/jira/tasks/sprint_issues_convertor.go
@@ -232,10 +232,12 @@ func (c *SprintIssuesConverter) handleTo(connectionId,
sprintId uint64, cl Chang
return nil
}
if item, ok := c.sprintIssue[key]; ok {
- if item != nil && (item.AddedDate == nil || item.AddedDate !=
nil && item.AddedDate.After(cl.Created)) {
- item.AddedDate = &cl.Created
- item.AddedStage = addedStage
+ if item != nil {
item.IsRemoved = false
+ if item.AddedDate == nil ||
item.AddedDate.After(cl.Created) {
+ item.AddedDate = &cl.Created
+ item.AddedStage = addedStage
+ }
}
} else {
c.sprintIssue[key] = &ticket.SprintIssue{
@@ -336,7 +338,7 @@ func (c *SprintIssuesConverter) getJiraIssue(connectionId,
issueId uint64) (*mod
}
func (c *SprintIssuesConverter) getStage(t time.Time, connectionId, sprintId
uint64) (*string, error) {
- sprint, err := c.getJiraSprint(sprintId, connectionId)
+ sprint, err := c.getJiraSprint(connectionId, sprintId)
if err != nil {
return nil, err
}
diff --git a/plugins/jira/tasks/worklog_collector.go
b/plugins/jira/tasks/worklog_collector.go
index 033f448f..7f5bed64 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -2,13 +2,11 @@ package tasks
import (
"encoding/json"
- "fmt"
"net/http"
"reflect"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/helper"
- "github.com/merico-dev/lake/plugins/jira/models"
"github.com/merico-dev/lake/plugins/jira/tasks/apiv2models"
)
@@ -18,29 +16,17 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
db := taskCtx.GetDb()
data := taskCtx.GetData().(*JiraTaskData)
since := data.Since
- incremental := false
-
- if since == nil {
- var latestUpdated models.JiraWorklog
- err := db.Where("connection_id = ?",
data.Connection.ID).Order("updated DESC").Limit(1).Find(&latestUpdated).Error
- if err != nil {
- return fmt.Errorf("failed to get latest jira issue
worklog record: %w", err)
- }
- if latestUpdated.IssueId > 0 {
- since = &latestUpdated.Updated
- incremental = true
- }
- }
logger := taskCtx.GetLogger()
connectionId := data.Connection.ID
boardId := data.Options.BoardId
- tx := db.Model(&models.JiraIssue{}).
- Joins("left join _tool_jira_board_issues on
_tool_jira_issues.issue_id = _tool_jira_board_issues.issue_id").
-
Select("_tool_jira_board_issues.issue_id").Where("_tool_jira_board_issues.connection_id
= ? AND _tool_jira_board_issues.board_id = ?", connectionId, boardId)
+ tx := db.Table("_tool_jira_board_issues bi").
+ Select("bi.issue_id, NOW() AS update_time").
+ Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)").
+ Where("bi.connection_id = ? AND bi.board_id = ? AND
(i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId,
boardId)
if since != nil {
- tx = tx.Where("_tool_jira_issues.updated > ?", since)
+ tx = tx.Where("i.updated > ?", since)
}
cursor, err := tx.Rows()
if err != nil {
@@ -64,7 +50,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: incremental,
+ Incremental: true,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage,
error) {
var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go
b/plugins/jira/tasks/worklog_extractor.go
index 4f0820c7..e9b0bf13 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -2,6 +2,7 @@ package tasks
import (
"encoding/json"
+ "github.com/merico-dev/lake/plugins/jira/models"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/helper"
@@ -12,6 +13,7 @@ var _ core.SubTaskEntryPoint = ExtractWorklogs
func ExtractWorklogs(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
+ db := taskCtx.GetDb()
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
@@ -22,8 +24,18 @@ func ExtractWorklogs(taskCtx core.SubTaskContext) error {
Table: RAW_WORKLOGS_TABLE,
},
Extract: func(row *helper.RawData) ([]interface{}, error) {
+ var input apiv2models.Input
+ err := json.Unmarshal(row.Input, &input)
+ if err != nil {
+ return nil, err
+ }
+ issue := &models.JiraIssue{ConnectionId:
data.Connection.ID, IssueId: input.IssueId}
+ err = db.Model(issue).Update("worklog_updated",
input.UpdateTime).Error
+ if err != nil {
+ return nil, err
+ }
var worklog apiv2models.Worklog
- err := json.Unmarshal(row.Data, &worklog)
+ err = json.Unmarshal(row.Data, &worklog)
if err != nil {
return nil, err
}