This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch fix#6075-2
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/fix#6075-2 by this push:
new d33e8e453 fix: unify processing and simplify logic
d33e8e453 is described below
commit d33e8e4532027f4b628b4381872c8c461e11ade9
Author: abeizn <[email protected]>
AuthorDate: Fri Sep 15 18:19:10 2023 +0800
fix: unify processing and simplify logic
---
.../pluginhelper/api/api_collector_with_state.go | 90 ++++++++++++++--------
.../plugins/github_graphql/tasks/pr_collector.go | 2 +-
.../plugins/gitlab/tasks/mr_detail_collector.go | 7 +-
backend/plugins/jira/tasks/worklog_collector.go | 16 +++-
4 files changed, 78 insertions(+), 37 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index a6b35e71c..978e03f97 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -37,9 +37,9 @@ type ApiCollectorStateManager struct {
// *GraphqlCollector
subtasks []plugin.SubTask
LatestState models.CollectorLatestState
- TimeAfter *time.Time
+ NewState models.CollectorLatestState
+ IsIncreamtal bool
Since *time.Time
- ExecuteStart time.Time
}
// NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -50,11 +50,13 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
}
- latestState := models.CollectorLatestState{}
- err = db.First(&latestState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+ // CollectorLatestState retrieves the latest collector state from the
database
+ oldState := models.CollectorLatestState{}
+ err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
if err != nil {
if db.IsErrorNotFound(err) {
- latestState = models.CollectorLatestState{
+ oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
@@ -62,42 +64,70 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
}
}
- var timeAfter *time.Time
+ // Extract timeAfter and latestSuccessStart from old state
+ oldTimeAfter := oldState.TimeAfter
+ oldLatestSuccessStart := oldState.LatestSuccessStart
+
+ // Get syncPolicy timeAfter and fullSync from context
+ var newTimeAfter *time.Time
+ var fullSync bool
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- timeAfter = syncPolicy.TimeAfter
+ newTimeAfter = syncPolicy.TimeAfter
+ fullSync = syncPolicy.FullSync
}
- since := GetSince(latestState.LatestSuccessStart, syncPolicy)
- return &ApiCollectorStateManager{
- RawDataSubTaskArgs: args,
- LatestState: latestState,
- TimeAfter: timeAfter,
- Since: since,
- ExecuteStart: time.Now(),
- }, nil
-}
+ // Calculate incremental and since based on syncPolicy and old state
+ var isIncreamtal bool
+ var since *time.Time
-func GetSince(latestSuccessStart *time.Time, syncPolicy *models.SyncPolicy)
*time.Time {
- // If the execution has not been successful before, return timeAfter
- if latestSuccessStart == nil {
- return syncPolicy.TimeAfter
+ // 1. If no oldState.LatestSuccessStart, not incremental and since is
syncPolicy.TimeAfter
+ if oldLatestSuccessStart == nil {
+ isIncreamtal = false
+ since = newTimeAfter
}
- // If syncPolicy is nil, return latestSuccessStart
+ // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
if syncPolicy == nil {
- return latestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
}
- // If syncPolicy is fullSync or timeAfter is after latestSuccessStart,
return timeAfter
- if syncPolicy.FullSync ||
syncPolicy.TimeAfter.After(*latestSuccessStart) {
- return syncPolicy.TimeAfter
+ // 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
+ if syncPolicy != nil && fullSync {
+ isIncreamtal = false
+ since = newTimeAfter
+ }
+
+ // 4. If syncPolicy.TimeAfter is not nil, compare with old
oldState.TimeAfter
+ if newTimeAfter != nil {
+ if oldTimeAfter == nil ||
!syncPolicy.TimeAfter.Before(*oldTimeAfter) {
+ isIncreamtal = false
+ since = newTimeAfter
+ } else {
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ }
+ } else {
+ isIncreamtal = false
+ since = nil
}
- return latestSuccessStart
+ currentTime := time.Now()
+ oldState.LatestSuccessStart = ¤tTime
+ oldState.TimeAfter = newTimeAfter
+
+ return &ApiCollectorStateManager{
+ RawDataSubTaskArgs: args,
+ NewState: oldState,
+ IsIncreamtal: isIncreamtal,
+ Since: since,
+ }, nil
+
}
// InitCollector init the embedded collector
func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs)
errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+ args.Incremental = m.IsIncreamtal
apiCollector, err := NewApiCollector(args)
if err != nil {
return err
@@ -127,10 +157,7 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
}
db := m.Ctx.GetDal()
- m.LatestState.LatestSuccessStart = &m.ExecuteStart
- m.LatestState.TimeAfter = m.TimeAfter
-
- return db.CreateOrUpdate(&m.LatestState)
+ return db.CreateOrUpdate(&m.NewState)
}
// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync
support for
@@ -167,10 +194,12 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
}
createdAfter := manager.Since
+ isIncremental := manager.IsIncreamtal
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
+ Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectNewRecordsByList.Query != nil {
@@ -242,6 +271,7 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
+ Incremental: true,
Input: input,
UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 81437a451..25048af48 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -192,7 +192,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
isFinish := false
for _, rawL := range prs {
// collect data even though in increment mode
because of updating existing data
- if collectorWithState.TimeAfter != nil &&
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+ if collectorWithState.LatestState.TimeAfter !=
nil && !collectorWithState.LatestState.TimeAfter.Before(rawL.UpdatedAt) {
isFinish = true
break
}
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 5b9af48dc..c74635942 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -85,11 +85,8 @@ func GetMergeRequestDetailsIterator(taskCtx
plugin.SubTaskContext, collectorWith
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.TimeAfter))
- }
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index 22e4bd4d9..4a08d9b10 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,21 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated >
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+
+ isIncreamtal := collectorWithState.IsIncreamtal
+ if isIncreamtal {
+ clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+ } else {
+ /*
+ i.updated > max(rl.issue_updated) was deleted because
for non-incremental collection,
+ max(rl.issue_updated) will only be one of null, less or
equal to i.updated
+ so i.updated > max(rl.issue_updated) is always false.
+ max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) >
0 infers the issue has more than 100 worklogs,
+ because we collected worklogs when collecting issues,
and assign worklog.issue_updated if num of worklogs < 100,
+ and max(c.issue_updated) IS NULL AND
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
+ */
+ clauses = append(clauses, dal.Having("max(wl.issue_updated) IS
NULL AND COUNT(wl.worklog_id) > 0"))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)