This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 102d5f7a Nested Resource Incremental Collection (#2191)
102d5f7a is described below

commit 102d5f7a3f713e239e4d9f0207682cd4257e65b3
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Jun 15 17:28:18 2022 +0800

    Nested Resource Incremental Collection (#2191)
    
    * fix: ignore '.so' files when loading plugins
    
    * fix: missing mock for method `Nested`
    
    * fix: worker errors was not caught
    
    * fix: worker_scheduler wouldn't halt on error
    
    * refactor: replace `GetDb` with `GetDal`
    
      Closes #2183
    
    * fix: unit test
    
    * refactor: nested resource incremental collection
    
      Closes #2189
    
    * fix: older changelogs wouldn't be collected
    
    * refactor: unify changelog.issue_updated values
---
 impl/dalgorm/dalgorm.go                            |  4 ++
 plugins/core/dal/dal.go                            | 16 +++++-
 plugins/jira/models/changelog.go                   |  3 +-
 plugins/jira/models/connection.go                  | 21 --------
 plugins/jira/models/issue.go                       |  1 -
 .../migrationscripts/updateSchemas20220615.go      | 62 ++++++++++++++++++++++
 plugins/jira/tasks/changelog_collector.go          | 39 ++++++++------
 plugins/jira/tasks/changelog_extractor.go          | 14 ++---
 plugins/jira/tasks/issue_extractor.go              | 60 +++++++++++----------
 plugins/jira/tasks/task_data.go                    | 15 ++++--
 10 files changed, 156 insertions(+), 79 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index b063f863..7f471982 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -61,6 +61,10 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
                        }
                case dal.SelectClause:
                        tx = tx.Select(d.(string))
+               case dal.GroupbyClause:
+                       tx = tx.Group(d.(string))
+               case dal.HavingClause:
+                       tx = tx.Having(d.(dal.DalClause).Expr, 
d.(dal.DalClause).Params...)
                }
        }
        return tx
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 5f97df5a..bf8bfaea 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -109,7 +109,21 @@ func Select(fields string) Clause {
 
 const OrderbyClause string = "OrderBy"
 
-// Orderby creates a new Orderby
+// Orderby creates a new Orderby clause
 func Orderby(expr string) Clause {
        return Clause{Type: OrderbyClause, Data: expr}
 }
+
+const GroupbyClause string = "GroupBy"
+
+// Groupby creates a new Groupby clause
+func Groupby(expr string) Clause {
+       return Clause{Type: GroupbyClause, Data: expr}
+}
+
+const HavingClause string = "Having"
+
+// Groupby creates a new Groupby clause
+func Having(clause string, params ...interface{}) Clause {
+       return Clause{Type: HavingClause, Data: DalClause{clause, params}}
+}
diff --git a/plugins/jira/models/changelog.go b/plugins/jira/models/changelog.go
index 301197cd..bd965056 100644
--- a/plugins/jira/models/changelog.go
+++ b/plugins/jira/models/changelog.go
@@ -33,7 +33,8 @@ type JiraChangelog struct {
        AuthorAccountId   string `gorm:"type:varchar(255)"`
        AuthorDisplayName string `gorm:"type:varchar(255)"`
        AuthorActive      bool
-       Created           time.Time `gorm:"index"`
+       Created           time.Time  `gorm:"index"`
+       IssueUpdated      *time.Time `comment:"corresponding issue.updated 
time, changelog might need update IFF changelog.issue_updated < issue.updated"`
 }
 
 type JiraChangelogItem struct {
diff --git a/plugins/jira/models/connection.go 
b/plugins/jira/models/connection.go
index 1a4b7a14..fea93aaf 100644
--- a/plugins/jira/models/connection.go
+++ b/plugins/jira/models/connection.go
@@ -48,24 +48,3 @@ type JiraConnection struct {
 func (JiraConnection) TableName() string {
        return "_tool_jira_connections"
 }
-
-type JiraIssueTypeMapping struct {
-       ConnectionID uint64 `gorm:"primaryKey" json:"jiraConnectionId" 
validate:"required"`
-       UserType     string `gorm:"type:varchar(50);primaryKey" json:"userType" 
validate:"required"`
-       StandardType string `gorm:"type:varchar(50)" json:"standardType" 
validate:"required"`
-}
-
-func (JiraIssueTypeMapping) TableName() string {
-       return "_tool_jira_issue_type_mappings"
-}
-
-type JiraIssueStatusMapping struct {
-       ConnectionID   uint64 `gorm:"primaryKey" json:"jiraConnectionId" 
validate:"required"`
-       UserType       string `gorm:"type:varchar(50);primaryKey" 
json:"userType" validate:"required"`
-       UserStatus     string `gorm:"type:varchar(50);primaryKey" 
json:"userStatus" validate:"required"`
-       StandardStatus string `gorm:"type:varchar(50)" json:"standardStatus" 
validate:"required"`
-}
-
-func (JiraIssueStatusMapping) TableName() string {
-       return "_tool_jira_issue_status_mappings"
-}
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index a727099b..ae74e3ea 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -64,7 +64,6 @@ type JiraIssue struct {
        AllFields                datatypes.JSONMap
 
        // internal status tracking
-       ChangelogUpdated  *time.Time
        RemotelinkUpdated *time.Time
        WorklogUpdated    *time.Time
        common.NoPKModel
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220615.go 
b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
new file mode 100644
index 00000000..f374ede6
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "context"
+       "time"
+
+       "gorm.io/gorm"
+)
+
+type UpdateSchemas20220615 struct {
+}
+
+type JiraIssue20220615 struct{}
+
+func (JiraIssue20220615) TableName() string {
+       return "_tool_jira_issues"
+}
+
+type JiraChangelog20220615 struct {
+       IssueUpdated *time.Time
+}
+
+func (JiraChangelog20220615) TableName() string {
+       return "_tool_jira_changelogs"
+}
+
+func (*UpdateSchemas20220615) Up(ctx context.Context, db *gorm.DB) error {
+       var err error
+       err = db.Migrator().DropColumn(&JiraIssue20220615{}, 
"changelog_updated")
+       if err != nil {
+               return err
+       }
+       err = db.Migrator().AutoMigrate(&JiraChangelog20220615{})
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (*UpdateSchemas20220615) Version() uint64 {
+       return 20220601154646
+}
+
+func (*UpdateSchemas20220615) Name() string {
+       return "replace issues.changelog_updated with changelogs.issue_updated"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go 
b/plugins/jira/tasks/changelog_collector.go
index 22030f7d..ebbb9c44 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,7 +25,7 @@ import (
        "reflect"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       . "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/apache/incubator-devlake/plugins/jira/models"
        "github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -40,26 +40,31 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
        if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
                return nil
        }
+       log := taskCtx.GetLogger()
        db := taskCtx.GetDal()
-       // figure out the time range
-       since := data.Since
 
-       // filter out issue_ids that needed collection
-       clauses := []Clause{
-               Select("bi.issue_id, NOW() AS update_time"),
-               From("_tool_jira_board_issues bi"),
-               Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
-               Where(
-                       `bi.connection_id = ?
-                          AND bi.board_id = ?
-                          AND (i.changelog_updated IS NULL OR 
i.changelog_updated < i.updated)`,
-                       data.Options.ConnectionId,
-                       data.Options.BoardId,
-               ),
+       // query for issue_ids that needed changelog collection
+       clauses := []dal.Clause{
+               dal.Select("i.issue_id, i.updated AS update_time"),
+               dal.From("_tool_jira_board_issues bi"),
+               dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
+               dal.Join("LEFT JOIN _tool_jira_changelogs c ON (c.connection_id 
= i.connection_id AND c.issue_id = i.issue_id)"),
+               dal.Where(`i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  `, data.Options.ConnectionId, data.Options.BoardId),
+               dal.Groupby("i.issue_id, i.updated"),
+               dal.Having("i.updated > max(c.issue_updated) OR  
max(c.issue_updated) IS NULL"),
        }
        // apply time range if any
+       since := data.Since
        if since != nil {
-               clauses = append(clauses, Where("i.updated > ?", *since))
+               clauses = append(clauses, dal.Where("i.updated > ?", *since))
+       }
+
+       if log.IsLevelEnabled(core.LOG_DEBUG) {
+               count, err := db.Count(clauses...)
+               if err != nil {
+                       return err
+               }
+               log.Debug("total number of issues to collect for: %d", count)
        }
 
        // construct the input iterator
@@ -85,7 +90,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
                },
                ApiClient:     data.ApiClient,
                PageSize:      100,
-               Incremental:   true,
+               Incremental:   since == nil,
                GetTotalPages: GetTotalPagesFromResponse,
                Input:         iterator,
                UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/plugins/jira/tasks/changelog_extractor.go 
b/plugins/jira/tasks/changelog_extractor.go
index d46097f2..435cf501 100644
--- a/plugins/jira/tasks/changelog_extractor.go
+++ b/plugins/jira/tasks/changelog_extractor.go
@@ -33,7 +33,6 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
        if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
                return nil
        }
-       db := taskCtx.GetDb()
        connectionId := data.Connection.ID
        extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
                RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
@@ -45,24 +44,25 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
                        Table: RAW_CHANGELOG_TABLE,
                },
                Extract: func(row *helper.RawData) ([]interface{}, error) {
+                       // process input
                        var input apiv2models.Input
                        err := json.Unmarshal(row.Input, &input)
                        if err != nil {
                                return nil, err
                        }
-                       var result []interface{}
                        var changelog apiv2models.Changelog
                        err = json.Unmarshal(row.Data, &changelog)
                        if err != nil {
                                return nil, err
                        }
-                       issue := &models.JiraIssue{ConnectionId: connectionId, 
IssueId: input.IssueId}
-                       err = db.Model(issue).Update("changelog_updated", 
input.UpdateTime).Error
-                       if err != nil {
-                               return nil, err
-                       }
+                       // prepare output
+                       var result []interface{}
                        cl, user := changelog.ToToolLayer(connectionId, 
input.IssueId)
+                       // this is crucial for incremental update
+                       cl.IssueUpdated = &input.UpdateTime
+                       // collect changelog / user inforation
                        result = append(result, cl, user)
+                       // collect changelog_items
                        for _, item := range changelog.Items {
                                result = append(result, 
item.ToToolLayer(connectionId, changelog.ID))
                                for _, u := range 
item.ExtractUser(connectionId) {
diff --git a/plugins/jira/tasks/issue_extractor.go 
b/plugins/jira/tasks/issue_extractor.go
index f9b2a15d..3c2cee56 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -19,9 +19,11 @@ package tasks
 
 import (
        "encoding/json"
-       "fmt"
+       "strconv"
        "strings"
+       "time"
 
+       "github.com/apache/incubator-devlake/models/domainlayer/ticket"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/apache/incubator-devlake/plugins/jira/models"
@@ -34,18 +36,19 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
        data := taskCtx.GetData().(*JiraTaskData)
        connectionId := data.Connection.ID
        boardId := data.Options.BoardId
-       db := taskCtx.GetDb()
        logger := taskCtx.GetLogger()
        logger.Info("extract Issues, connection_id=%d, board_id=%d", 
connectionId, boardId)
        // prepare getStdType function
-       var typeMappingRows []*models.JiraIssueTypeMapping
-       err := db.Find(&typeMappingRows, "connection_id = ?", 
connectionId).Error
-       if err != nil {
-               return err
-       }
+       // TODO: implement type mapping
        typeMappings := make(map[string]string)
-       for _, typeMappingRow := range typeMappingRows {
-               typeMappings[typeMappingRow.UserType] = 
typeMappingRow.StandardType
+       for _, userType := range 
data.Options.IssueExtraction.RequirementTypeMapping {
+               typeMappings[userType] = "REQUIREMENT"
+       }
+       for _, userType := range data.Options.IssueExtraction.BugTypeMapping {
+               typeMappings[userType] = "BUG"
+       }
+       for _, userType := range 
data.Options.IssueExtraction.IncidentTypeMapping {
+               typeMappings[userType] = "INCIDENT"
        }
        getStdType := func(userType string) string {
                stdType := typeMappings[userType]
@@ -54,20 +57,14 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
                }
                return strings.ToUpper(stdType)
        }
-       // prepare getStdStatus function
-       // TODO: status mapping is now not used
-       var statusMappingRows []*models.JiraIssueStatusMapping
-       err = db.Find(&statusMappingRows, "connection_id = ?", 
connectionId).Error
-       if err != nil {
-               return err
-       }
-       statusMappings := make(map[string]string)
-       makeStatusMappingKey := func(userType string, userStatus string) string 
{
-               return fmt.Sprintf("%v:%v", userType, userStatus)
-       }
-       for _, statusMappingRow := range statusMappingRows {
-               k := makeStatusMappingKey(statusMappingRow.UserType, 
statusMappingRow.UserStatus)
-               statusMappings[k] = statusMappingRow.StandardStatus
+       getStdStatus := func(statusKey string) string {
+               if statusKey == "done" {
+                       return ticket.DONE
+               } else if statusKey == "new" {
+                       return ticket.TODO
+               } else {
+                       return ticket.IN_PROGRESS
+               }
        }
 
        extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
@@ -111,17 +108,26 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
                        if issue.ResolutionDate != nil {
                                issue.LeadTimeMinutes = 
uint(issue.ResolutionDate.Unix()-issue.Created.Unix()) / 60
                        }
+                       if data.Options.IssueExtraction.StoryPointField != "" {
+                               strStoryPoint := 
apiIssue.Fields.AllFields[data.Options.IssueExtraction.StoryPointField].(string)
+                               issue.StoryPoint, _ = 
strconv.ParseFloat(strStoryPoint, 32)
+                       }
                        issue.StdStoryPoint = uint(issue.StoryPoint)
                        issue.StdType = getStdType(issue.Type)
-                       issue.StdStatus = GetStdStatus(issue.StatusKey)
-                       if len(changelogs) < 100 {
-                               issue.ChangelogUpdated = &row.CreatedAt
-                       }
+                       issue.StdStatus = getStdStatus(issue.StatusKey)
                        results = append(results, issue)
                        for _, worklog := range worklogs {
                                results = append(results, worklog)
                        }
+                       var issueUpdated *time.Time
+                       // likely this issue has more changelogs to be collected
+                       if len(changelogs) == 100 {
+                               issueUpdated = nil
+                       } else {
+                               issueUpdated = &issue.Updated
+                       }
                        for _, changelog := range changelogs {
+                               changelog.IssueUpdated = issueUpdated
                                results = append(results, changelog)
                        }
                        for _, changelogItem := range changelogItems {
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index 7b3d6b8c..23cc2957 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -25,10 +25,17 @@ import (
 )
 
 type JiraOptions struct {
-       ConnectionId uint64   `json:"connectionId"`
-       BoardId      uint64   `json:"boardId"`
-       Tasks        []string `json:"tasks,omitempty"`
-       Since        string
+       ConnectionId    uint64   `json:"connectionId"`
+       BoardId         uint64   `json:"boardId"`
+       Tasks           []string `json:"tasks,omitempty"`
+       Since           string
+       IssueExtraction struct {
+               RequirementTypeMapping []string
+               BugTypeMapping         []string
+               IncidentTypeMapping    []string
+               //EpicKeyField           string
+               StoryPointField string
+       }
 }
 
 type JiraTaskData struct {

Reply via email to