This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch fix#6075-2
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/fix#6075-2 by this push:
     new d33e8e453 fix: unify processing and simplify logic
d33e8e453 is described below

commit d33e8e4532027f4b628b4381872c8c461e11ade9
Author: abeizn <[email protected]>
AuthorDate: Fri Sep 15 18:19:10 2023 +0800

    fix: unify processing and simplify logic
---
 .../pluginhelper/api/api_collector_with_state.go   | 90 ++++++++++++++--------
 .../plugins/github_graphql/tasks/pr_collector.go   |  2 +-
 .../plugins/gitlab/tasks/mr_detail_collector.go    |  7 +-
 backend/plugins/jira/tasks/worklog_collector.go    | 16 +++-
 4 files changed, 78 insertions(+), 37 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index a6b35e71c..978e03f97 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -37,9 +37,9 @@ type ApiCollectorStateManager struct {
        // *GraphqlCollector
        subtasks     []plugin.SubTask
        LatestState  models.CollectorLatestState
-       TimeAfter    *time.Time
+       NewState     models.CollectorLatestState
+       IsIncreamtal bool
        Since        *time.Time
-       ExecuteStart time.Time
 }
 
 // NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -50,11 +50,13 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
        if err != nil {
                return nil, errors.Default.Wrap(err, "Couldn't resolve raw 
subtask args")
        }
-       latestState := models.CollectorLatestState{}
-       err = db.First(&latestState, dal.Where(`raw_data_table = ? AND 
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+       // CollectorLatestState retrieves the latest collector state from the 
database
+       oldState := models.CollectorLatestState{}
+       err = db.First(&oldState, dal.Where(`raw_data_table = ? AND 
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
        if err != nil {
                if db.IsErrorNotFound(err) {
-                       latestState = models.CollectorLatestState{
+                       oldState = models.CollectorLatestState{
                                RawDataTable:  rawDataSubTask.table,
                                RawDataParams: rawDataSubTask.params,
                        }
@@ -62,42 +64,70 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
                        return nil, errors.Default.Wrap(err, "failed to load 
JiraLatestCollectorMeta")
                }
        }
-       var timeAfter *time.Time
+       // Extract timeAfter and latestSuccessStart from old state
+       oldTimeAfter := oldState.TimeAfter
+       oldLatestSuccessStart := oldState.LatestSuccessStart
+
+       // Get syncPolicy timeAfter and fullSync from context
+       var newTimeAfter *time.Time
+       var fullSync bool
        syncPolicy := args.Ctx.TaskContext().SyncPolicy()
        if syncPolicy != nil && syncPolicy.TimeAfter != nil {
-               timeAfter = syncPolicy.TimeAfter
+               newTimeAfter = syncPolicy.TimeAfter
+               fullSync = syncPolicy.FullSync
        }
-       since := GetSince(latestState.LatestSuccessStart, syncPolicy)
 
-       return &ApiCollectorStateManager{
-               RawDataSubTaskArgs: args,
-               LatestState:        latestState,
-               TimeAfter:          timeAfter,
-               Since:              since,
-               ExecuteStart:       time.Now(),
-       }, nil
-}
+       // Calculate incremental and since based on syncPolicy and old state
+       var isIncreamtal bool
+       var since *time.Time
 
-func GetSince(latestSuccessStart *time.Time, syncPolicy *models.SyncPolicy) 
*time.Time {
-       // If the execution has not been successful before, return timeAfter
-       if latestSuccessStart == nil {
-               return syncPolicy.TimeAfter
+       // 1. If no oldState.LatestSuccessStart, not incremental and since is 
syncPolicy.TimeAfter
+       if oldLatestSuccessStart == nil {
+               isIncreamtal = false
+               since = newTimeAfter
        }
-       // If syncPolicy is nil, return latestSuccessStart
+       // 2. If no syncPolicy, incremental and since is 
oldState.LatestSuccessStart
        if syncPolicy == nil {
-               return latestSuccessStart
+               isIncreamtal = true
+               since = oldLatestSuccessStart
        }
-       // If syncPolicy is fullSync or timeAfter is after latestSuccessStart, 
return timeAfter
-       if syncPolicy.FullSync || 
syncPolicy.TimeAfter.After(*latestSuccessStart) {
-               return syncPolicy.TimeAfter
+       // 3. If fullSync true, not incremental and since is 
syncPolicy.TimeAfter
+       if syncPolicy != nil && fullSync {
+               isIncreamtal = false
+               since = newTimeAfter
+       }
+
+       // 4. If syncPolicy.TimeAfter is not nil, compare with old 
oldState.TimeAfter
+       if newTimeAfter != nil {
+               if oldTimeAfter == nil || 
!syncPolicy.TimeAfter.Before(*oldTimeAfter) {
+                       isIncreamtal = false
+                       since = newTimeAfter
+               } else {
+                       isIncreamtal = true
+                       since = oldLatestSuccessStart
+               }
+       } else {
+               isIncreamtal = false
+               since = nil
        }
 
-       return latestSuccessStart
+       currentTime := time.Now()
+       oldState.LatestSuccessStart = &currentTime
+       oldState.TimeAfter = newTimeAfter
+
+       return &ApiCollectorStateManager{
+               RawDataSubTaskArgs: args,
+               NewState:           oldState,
+               IsIncreamtal:       isIncreamtal,
+               Since:              since,
+       }, nil
+
 }
 
 // InitCollector init the embedded collector
 func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+       args.Incremental = m.IsIncreamtal
        apiCollector, err := NewApiCollector(args)
        if err != nil {
                return err
@@ -127,10 +157,7 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
        }
 
        db := m.Ctx.GetDal()
-       m.LatestState.LatestSuccessStart = &m.ExecuteStart
-       m.LatestState.TimeAfter = m.TimeAfter
-
-       return db.CreateOrUpdate(&m.LatestState)
+       return db.CreateOrUpdate(&m.NewState)
 }
 
 // NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync 
support for
@@ -167,10 +194,12 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        }
 
        createdAfter := manager.Since
+       isIncremental := manager.IsIncreamtal
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
                // common
+               Incremental: isIncremental,
                UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
                Query: func(reqData *RequestData) (url.Values, errors.Error) {
                        if args.CollectNewRecordsByList.Query != nil {
@@ -242,6 +271,7 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
                // common
+               Incremental: true,
                Input:       input,
                UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
                Query: func(reqData *RequestData) (url.Values, errors.Error) {
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 81437a451..25048af48 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -192,7 +192,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
                        isFinish := false
                        for _, rawL := range prs {
                                // collect data even though in increment mode 
because of updating existing data
-                               if collectorWithState.TimeAfter != nil && 
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+                               if collectorWithState.LatestState.TimeAfter != 
nil && !collectorWithState.LatestState.TimeAfter.Before(rawL.UpdatedAt) {
                                        isFinish = true
                                        break
                                }
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go 
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 5b9af48dc..c74635942 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -85,11 +85,8 @@ func GetMergeRequestDetailsIterator(taskCtx 
plugin.SubTaskContext, collectorWith
                        data.Options.ProjectId, data.Options.ConnectionId, true,
                ),
        }
-       if collectorWithState.LatestState.LatestSuccessStart != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       } else if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.TimeAfter))
-       }
+       clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index 22e4bd4d9..4a08d9b10 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,21 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > 
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+
+       isIncreamtal := collectorWithState.IsIncreamtal
+       if isIncreamtal {
+               clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+       } else {
+               /*
+                       i.updated > max(rl.issue_updated) was deleted because 
for non-incremental collection,
+                       max(rl.issue_updated) will only be one of null, less or 
equal to i.updated
+                       so i.updated > max(rl.issue_updated) is always false.
+                       max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) > 
0 infers the issue has more than 100 worklogs,
+                       because we collected worklogs when collecting issues, 
and assign worklog.issue_updated if num of worklogs < 100,
+                       and max(c.issue_updated) IS NULL AND 
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned 
issue_updated
+               */
+               clauses = append(clauses, dal.Having("max(wl.issue_updated) IS 
NULL AND COUNT(wl.worklog_id) > 0"))
+       }
 
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)

Reply via email to