This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 3b8e9687f refactor: use since instead of
timeafter,incremental,latestSuccessStart (#6094)
3b8e9687f is described below
commit 3b8e9687f7221729146759f4dd06affe2da29fa4
Author: abeizn <[email protected]>
AuthorDate: Tue Sep 19 11:14:09 2023 +0800
refactor: use since instead of timeafter,incremental,latestSuccessStart
(#6094)
* refactor: use since instead of timeafter,incremental,latestSuccessStart
* fix: unify processing and simplify logic
* fix: unify processing and simplify logic
* fix: lint
* fix: when since is nil and add collector IsIncreamtal condition
* fix: since logic
* fix: remove LatestState
* fix: remove some isIncremental
---------
Co-authored-by: Klesh Wong <[email protected]>
---
.../pluginhelper/api/api_collector_with_state.go | 94 ++++++++++++----------
backend/plugins/bitbucket/tasks/api_common.go | 23 +++---
backend/plugins/bitbucket/tasks/issue_collector.go | 1 -
.../bitbucket/tasks/issue_comment_collector.go | 1 -
.../plugins/bitbucket/tasks/pipeline_collector.go | 1 -
.../bitbucket/tasks/pipeline_steps_collector.go | 1 -
backend/plugins/bitbucket/tasks/pr_collector.go | 1 -
.../bitbucket/tasks/pr_comment_collector.go | 1 -
.../plugins/bitbucket/tasks/pr_commit_collector.go | 1 -
backend/plugins/github/tasks/cicd_job_collector.go | 10 +--
backend/plugins/github/tasks/comment_collector.go | 12 +--
backend/plugins/github/tasks/commit_collector.go | 13 +--
backend/plugins/github/tasks/issue_collector.go | 13 +--
.../plugins/github/tasks/pr_commit_collector.go | 15 ++--
.../plugins/github/tasks/pr_review_collector.go | 13 ++-
.../github/tasks/pr_review_comment_collector.go | 15 +---
.../github_graphql/tasks/issue_collector.go | 8 +-
.../plugins/github_graphql/tasks/job_collector.go | 8 +-
.../plugins/github_graphql/tasks/pr_collector.go | 4 +-
backend/plugins/gitlab/tasks/issue_collector.go | 13 +--
backend/plugins/gitlab/tasks/mr_collector.go | 9 +--
.../plugins/gitlab/tasks/mr_detail_collector.go | 7 +-
backend/plugins/gitlab/tasks/mr_note_collector.go | 3 -
backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +--
.../gitlab/tasks/pipeline_detail_collector.go | 7 +-
backend/plugins/gitlab/tasks/shared.go | 4 +-
backend/plugins/jenkins/tasks/stage_collector.go | 7 +-
.../jira/tasks/development_panel_collector.go | 14 +---
.../jira/tasks/issue_changelog_collector.go | 9 +--
backend/plugins/jira/tasks/issue_collector.go | 29 +++----
backend/plugins/jira/tasks/issue_collector_test.go | 56 ++-----------
.../plugins/jira/tasks/issue_comment_collector.go | 10 +--
backend/plugins/jira/tasks/remotelink_collector.go | 10 +--
backend/plugins/jira/tasks/worklog_collector.go | 17 ++--
.../plugins/tapd/tasks/bug_changelog_collector.go | 9 +--
backend/plugins/tapd/tasks/bug_collector.go | 10 +--
backend/plugins/tapd/tasks/bug_commit_collector.go | 10 +--
backend/plugins/tapd/tasks/iteration_collector.go | 10 +--
backend/plugins/tapd/tasks/story_bug_collector.go | 10 +--
.../tapd/tasks/story_changelog_collector.go | 9 +--
backend/plugins/tapd/tasks/story_collector.go | 10 +--
.../plugins/tapd/tasks/story_commit_collector.go | 9 +--
.../plugins/tapd/tasks/task_changelog_collector.go | 9 +--
backend/plugins/tapd/tasks/task_collector.go | 10 +--
.../plugins/tapd/tasks/task_commit_collector.go | 9 +--
backend/plugins/tapd/tasks/worklog_collector.go | 10 +--
.../plugins/zentao/tasks/bug_commits_collector.go | 10 +--
.../zentao/tasks/story_commits_collector.go | 10 +--
.../plugins/zentao/tasks/task_commits_collector.go | 10 +--
49 files changed, 184 insertions(+), 401 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 813a75f73..25471612b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -36,9 +36,9 @@ type ApiCollectorStateManager struct {
// *ApiCollector
// *GraphqlCollector
subtasks []plugin.SubTask
- LatestState models.CollectorLatestState
- TimeAfter *time.Time
- ExecuteStart time.Time
+ newState models.CollectorLatestState
+ IsIncreamtal bool
+ Since *time.Time
}
// NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -49,11 +49,13 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
}
- latestState := models.CollectorLatestState{}
- err = db.First(&latestState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+ // CollectorLatestState retrieves the latest collector state from the
database
+ oldState := models.CollectorLatestState{}
+ err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
if err != nil {
if db.IsErrorNotFound(err) {
- latestState = models.CollectorLatestState{
+ oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
@@ -61,42 +63,57 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
}
}
- var timeAfter *time.Time
+ // Extract timeAfter and latestSuccessStart from old state
+ oldTimeAfter := oldState.TimeAfter
+ oldLatestSuccessStart := oldState.LatestSuccessStart
+
+ // Calculate incremental and since based on syncPolicy and old state
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- timeAfter = syncPolicy.TimeAfter
+ var isIncreamtal bool
+ var since *time.Time
+
+ if syncPolicy == nil {
+ // 1. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ } else if oldLatestSuccessStart == nil {
+ // 2. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = syncPolicy.TimeAfter
+ } else if syncPolicy.FullSync {
+ // 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = syncPolicy.TimeAfter
+ } else if syncPolicy.TimeAfter != nil {
+ // 4. If syncPolicy.TimeAfter not nil
+ if oldTimeAfter != nil &&
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
+ // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = syncPolicy.TimeAfter
+ } else {
+ // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ }
}
+
+ currentTime := time.Now()
+ oldState.LatestSuccessStart = ¤tTime
+ oldState.TimeAfter = syncPolicy.TimeAfter
+
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
- LatestState: latestState,
- TimeAfter: timeAfter,
- ExecuteStart: time.Now(),
+ newState: oldState,
+ IsIncreamtal: isIncreamtal,
+ Since: since,
}, nil
-}
-// IsIncremental indicates if the collector should operate in incremental mode
-func (m *ApiCollectorStateManager) IsIncremental() bool {
- prevSyncTime := m.LatestState.LatestSuccessStart
- prevTimeAfter := m.LatestState.TimeAfter
- syncPolicy := m.Ctx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.FullSync {
- return false
- }
-
- if prevSyncTime == nil {
- return false
- }
- // if we cleared the timeAfter, or moved timeAfter back in time, we
should do a full sync
- currTimeAfter := syncPolicy.TimeAfter
- if currTimeAfter != nil {
- return prevTimeAfter == nil ||
!currTimeAfter.Before(*prevTimeAfter)
- }
- return prevTimeAfter == nil
}
// InitCollector init the embedded collector
func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs)
errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+ args.Incremental = m.IsIncreamtal
apiCollector, err := NewApiCollector(args)
if err != nil {
return err
@@ -126,10 +143,7 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
}
db := m.Ctx.GetDal()
- m.LatestState.LatestSuccessStart = &m.ExecuteStart
- m.LatestState.TimeAfter = m.TimeAfter
-
- return db.CreateOrUpdate(&m.LatestState)
+ return db.CreateOrUpdate(&m.newState)
}
// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync
support for
@@ -165,14 +179,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
return nil, err
}
- var isIncremental = manager.IsIncremental()
- var createdAfter *time.Time
- if isIncremental {
- createdAfter = manager.LatestState.LatestSuccessStart
- } else {
- createdAfter = manager.TimeAfter
- }
-
+ createdAfter := manager.Since
+ isIncremental := manager.IsIncreamtal
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 284df3df2..7e2e76775 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -101,14 +101,10 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
}
query.Set("fields", fields)
query.Set("sort", "created_on")
- if collectorWithState.IsIncremental() {
- latestSuccessStart :=
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s",
latestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- timeAfter :=
collectorWithState.TimeAfter.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
- }
+ if collectorWithState.Since != nil {
+ query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
+ }
return query, nil
}
}
@@ -179,9 +175,10 @@ func GetPullRequestsIterator(taskCtx
plugin.SubTaskContext, collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
}
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -202,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
@@ -225,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.Since))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go
b/backend/plugins/bitbucket/tasks/issue_collector.go
index a5f2e4a41..3edec1bea 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -43,7 +43,6 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/issues",
Query: GetQueryCreatedAndUpdated(
`values.type,values.id,values.links.self,`+
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8e0036c4a..cdc9c2e3d 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName }}/issues/{{
.Input.BitbucketId }}/comments",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index e25861936..d11a7f243 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -43,7 +43,6 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/",
Query: GetQueryCreatedAndUpdated(
`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index 99cc1e39b..8c9d70aa7 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -52,7 +52,6 @@ func CollectPipelineSteps(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/{{
.Input.BitbucketId }}/steps/",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go
b/backend/plugins/bitbucket/tasks/pr_collector.go
index 7379cc1e4..7aa9641e9 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -46,7 +46,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/pullrequests",
Query: GetQueryCreatedAndUpdated(
`values.id,values.comment_count,values.type,values.state,values.title,values.description,`+
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index c4d539f85..ee7ccd228 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiPullRequestsComments(taskCtx
plugin.SubTaskContext) errors.Error
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName
}}/pullrequests/{{ .Input.BitbucketId }}/comments",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 9d99da39c..8ed4a5a11 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -52,7 +52,6 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName
}}/pullrequests/{{ .Input.BitbucketId }}/commits",
GetNextPageCustomData: GetNextPageCustomData,
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index 73fa2ab0d..93617f779 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,13 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error
{
data.Options.GithubId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -102,7 +97,6 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error
{
ApiClient: data.ApiClient,
PageSize: 100,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{
.Input.ID }}/jobs",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index 197a1f7a0..73dfad834 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -58,23 +58,15 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
-
UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- // Note that `since` is for filtering records
by the `updated` time
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- // if incremental == true, we overwrite it
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index ca37c6227..5a9be44db 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -58,11 +58,9 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
/*
url may use arbitrary variables from different source
in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -79,11 +77,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
}
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index c27eaea86..67d2ce506 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -58,11 +58,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
/*
url may use arbitrary variables from different source
in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -79,11 +77,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
}
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 0a209ff2e..77c0e2a26 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -73,20 +73,18 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
clauses := []dal.Clause{
dal.Select("number, github_id"),
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- // incremental collection, no need to care about the timeFilter since
it has to be collected by PR
- if incremental {
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
clauses = append(
clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
+ dal.Where("github_updated_at > ?",
collectorWithState.Since),
)
}
+
cursor, err := db.Cursor(
clauses...,
)
@@ -98,10 +96,9 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
- Input: iterator,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Input: iterator,
UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number
}}/commits",
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index 18bbde6b3..6cc8d8ce9 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -65,18 +65,18 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("number, github_id"),
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if incremental {
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
clauses = append(
clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
+ dal.Where("github_updated_at > ?",
collectorWithState.Since),
)
}
+
cursor, err := db.Cursor(
clauses...,
)
@@ -90,10 +90,9 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
}
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
- Input: iterator,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Input: iterator,
UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number
}}/reviews",
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index eb3720ce0..e05ae2812 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -61,11 +61,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
Header: func(reqData *helper.RequestData) (http.Header,
errors.Error) {
// Adding -H "Accept: application/vnd.github+json"
solve the issue of getting 502/403 error
header := http.Header{}
@@ -76,13 +74,8 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- // Note that `since` is for filtering records
by the `updated` time
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- // if incremental == true, we overwrite it
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7645db8d9..cfb0459e4 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -109,21 +109,17 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 100,
- Incremental: incremental,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
query := &GraphqlQueryIssueWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
since := helper.DateTime{}
- if incremental {
- since = helper.DateTime{Time:
*collectorWithState.LatestState.LatestSuccessStart}
- } else if collectorWithState.TimeAfter != nil {
- since = helper.DateTime{Time:
*collectorWithState.TimeAfter}
+ if collectorWithState.Since != nil {
+ since = helper.DateTime{Time:
*collectorWithState.Since}
}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 095276c4a..41836d3a8 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -116,17 +116,16 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
clauses := []dal.Clause{
dal.Select("check_suite_node_id"),
dal.From(models.GithubRun{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- if incremental {
- clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
}
+
cursor, err := db.Cursor(
clauses...,
)
@@ -142,7 +141,6 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
InputStep: 20,
- Incremental: incremental,
GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
query := &GraphqlQueryCheckRunWrapper{}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index afd8ff0e8..37d10e742 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -160,11 +160,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
- Incremental: incremental,
/*
(Optional) Return query string for request, or you can
plug them into UrlTemplate directly
*/
@@ -194,7 +192,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
isFinish := false
for _, rawL := range prs {
// collect data even though in increment mode
because of updating existing data
- if collectorWithState.TimeAfter != nil &&
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+ if collectorWithState.Since != nil &&
!collectorWithState.Since.Before(rawL.UpdatedAt) {
isFinish = true
break
}
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go
b/backend/plugins/gitlab/tasks/issue_collector.go
index 0829f9cde..81c78be37 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -51,11 +51,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
UrlTemplate: "projects/{{ .Params.ProjectId }}/issues",
/*
@@ -63,11 +61,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
*/
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
}
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go
b/backend/plugins/gitlab/tasks/mr_collector.go
index c55fe760e..ce0f21b3c 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -48,11 +48,9 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/merge_requests",
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: GetRawMessageFromResponse,
@@ -61,11 +59,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return nil, err
}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
}
return query, nil
},
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 5b9af48dc..f60871a24 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -85,11 +85,10 @@ func GetMergeRequestDetailsIterator(taskCtx
plugin.SubTaskContext, collectorWith
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.TimeAfter))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
}
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index d6fde5751..282135982 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -51,12 +51,9 @@ func CollectApiMergeRequestsNotes(taskCtx
plugin.SubTaskContext) errors.Error {
}
defer iterator.Close()
- incremental := collectorWithState.IsIncremental()
-
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/merge_requests/{{ .Input.Iid }}/notes?system=false",
Query: GetQuery,
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index c22cc872a..06a14c925 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -53,21 +53,17 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
PageSize: 100,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
}
query.Set("with_stats", "true")
query.Set("sort", "asc")
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index a37807b8c..aa4c80148 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -55,8 +55,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
iterator, err := GetPipelinesIterator(taskCtx, collectorWithState)
if err != nil {
return err
@@ -68,7 +66,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext)
errors.Error {
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines/{{ .Input.GitlabId }}",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
@@ -96,8 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *hel
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/shared.go
b/backend/plugins/gitlab/tasks/shared.go
index 06cc46225..7f1e3e8c1 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -159,8 +159,8 @@ func GetMergeRequestsIterator(taskCtx
plugin.SubTaskContext, collectorWithState
),
}
if collectorWithState != nil {
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *collectorWithState.Since))
}
}
// construct the input iterator
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index bf159c1d6..47530c95e 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,12 +67,9 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
-
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.Since))
}
-
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index 278052cdc..64a7ea372 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,14 +72,9 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
-
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect development panel error")
@@ -93,9 +88,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- Input: iterator,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ Input: iterator,
// the URL looks like:
//
https://merico.atlassian.net/rest/dev-status/1.0/issue/detail?issueId=25184&applicationType=GitLab&dataType=repository
UrlTemplate: "dev-status/1.0/issue/detail",
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 7b5fc7674..bcbf46dc0 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,12 +71,8 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
if logger.IsLevelEnabled(log.LOG_DEBUG) {
@@ -102,7 +98,6 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/backend/plugins/jira/tasks/issue_collector.go
b/backend/plugins/jira/tasks/issue_collector.go
index b00cd4ff3..3faaa9d51 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -70,19 +70,20 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
// build jql
// IMPORTANT: we have to keep paginated data in a consistence order to
avoid data-missing, if we sort issues by
// `updated`, issue will be jumping between pages if it got updated
during the collection process
- incremental := collectorWithState.IsIncremental()
loc, err := getTimeZone(taskCtx)
if err != nil {
logger.Info("failed to get timezone, err: %v", err)
} else {
logger.Info("got user's timezone: %v", loc.String())
}
- jql := buildJQL(collectorWithState.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+ jql := "ORDER BY created ASC"
+ if collectorWithState.Since != nil {
+ jql = buildJQL(*collectorWithState.Since, loc)
+ }
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: data.Options.PageSize,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: data.Options.PageSize,
/*
url may use arbitrary variables from different
connection in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -146,23 +147,15 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
}
// buildJQL build jql based on timeAfter and incremental mode
-func buildJQL(timeAfter, latestSuccessStart *time.Time, isIncremental bool,
location *time.Location) string {
+func buildJQL(since time.Time, location *time.Location) string {
jql := "ORDER BY created ASC"
- var moment time.Time
- if timeAfter != nil {
- moment = *timeAfter
- }
- // if isIncremental is true, we should not collect data before
latestSuccessStart
- if isIncremental && latestSuccessStart.After(moment) {
- moment = *latestSuccessStart
- }
- if !moment.IsZero() {
+ if !since.IsZero() {
if location != nil {
- moment = moment.In(location)
+ since = since.In(location)
} else {
- moment = moment.In(time.UTC).Add(-24 * time.Hour)
+ since = since.In(time.UTC).Add(-24 * time.Hour)
}
- jql = fmt.Sprintf("updated >= '%s' %s",
moment.Format("2006/01/02 15:04"), jql)
+ jql = fmt.Sprintf("updated >= '%s' %s",
since.Format("2006/01/02 15:04"), jql)
}
return jql
}
diff --git a/backend/plugins/jira/tasks/issue_collector_test.go
b/backend/plugins/jira/tasks/issue_collector_test.go
index c9891943d..99bf5a533 100644
--- a/backend/plugins/jira/tasks/issue_collector_test.go
+++ b/backend/plugins/jira/tasks/issue_collector_test.go
@@ -26,13 +26,10 @@ func Test_buildJQL(t *testing.T) {
base := time.Date(2021, 2, 3, 4, 5, 6, 7, time.UTC)
timeAfter := base
add48 := base.Add(48 * time.Hour)
- minus48 := base.Add(-48 * time.Hour)
loc, _ := time.LoadLocation("Asia/Shanghai")
type args struct {
- timeAfter *time.Time
- latestSuccessStart *time.Time
- isIncremental bool
- location *time.Location
+ since *time.Time
+ location *time.Location
}
tests := []struct {
name string
@@ -42,56 +39,15 @@ func Test_buildJQL(t *testing.T) {
{
name: "test incremental",
args: args{
- timeAfter: nil,
- latestSuccessStart: nil,
- isIncremental: false,
- },
- want: "ORDER BY created ASC"},
- {
- name: "test incremental",
- args: args{
- timeAfter: nil,
- latestSuccessStart: &add48,
- isIncremental: true,
- location: loc,
- },
- want: "updated >= '2021/02/05 12:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &base,
- latestSuccessStart: nil,
- isIncremental: false,
- location: loc,
- },
- want: "updated >= '2021/02/03 12:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &add48,
- isIncremental: true,
- },
- want: "updated >= '2021/02/04 04:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &add48,
- isIncremental: true,
- location: loc,
+ since: &add48,
+ location: loc,
},
want: "updated >= '2021/02/05 12:05' ORDER BY created
ASC",
},
{
name: "test incremental",
args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &minus48,
- isIncremental: true,
+ since: &timeAfter,
},
want: "updated >= '2021/02/02 04:05' ORDER BY created
ASC",
},
@@ -99,7 +55,7 @@ func Test_buildJQL(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := buildJQL(tt.args.timeAfter,
tt.args.latestSuccessStart, tt.args.isIncremental, tt.args.location); got !=
tt.want {
+ if got := buildJQL(*tt.args.since, tt.args.location);
got != tt.want {
t.Errorf("buildJQL() = %v, want %v", got,
tt.want)
}
})
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index fb0d2168c..71c91491e 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,14 +71,9 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
-
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
if err != nil {
@@ -102,7 +97,6 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/comment",
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index 158640d5c..450723ec2 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,14 +70,9 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
-
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect remotelink error")
@@ -93,7 +88,6 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
if res.StatusCode == http.StatusNotFound {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index d0feab119..71121597f 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,21 +66,19 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
} else {
/*
- i.updated > max(rl.issue_updated) was deleted because
for non-incremental collection,
- max(rl.issue_updated) will only be one of null, less or
equal to i.updated
- so i.updated > max(rl.issue_updated) is always false.
- max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) >
0 infers the issue has more than 100 worklogs,
+ i.updated > max(wl.issue_updated) was deleted because
for non-incremental collection,
+ max(wl.issue_updated) will only be one of null, less or
equal to i.updated
+ so i.updated > max(wl.issue_updated) is always false.
+ max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id)
> 0 infers the issue has more than 100 worklogs,
because we collected worklogs when collecting issues,
and assign worklog.issue_updated if num of worklogs < 100,
- and max(c.issue_updated) IS NULL AND
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
+ and max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
*/
clauses = append(clauses, dal.Having("max(wl.issue_updated) IS
NULL AND COUNT(wl.worklog_id) > 0"))
}
-
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -96,7 +94,6 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 264e45342..bf768e0d1 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bug_changes",
@@ -51,11 +49,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index b5c85f68a..746c1bd89 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -38,9 +38,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bugs",
@@ -51,11 +50,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error
{
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 97a1c2937..3164adb05 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -44,17 +44,14 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
+
clauses := []dal.Clause{
dal.Select("_tool_tapd_bugs.id as issue_id, modified as
update_time"),
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -67,7 +64,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index 3a6ebc182..5f5512b1f 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -40,9 +40,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
Concurrency: 3,
@@ -53,11 +52,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go
b/backend/plugins/tapd/tasks/story_bug_collector.go
index b9ac18dfa..9d5d0ed0c 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -42,17 +42,14 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect storyBugs")
- incremental := collectorWithState.IsIncremental()
+
clauses := []dal.Clause{
dal.Select("id as issue_id, modified as update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -64,7 +61,6 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "stories/get_related_bugs",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index 7aa863417..d0597b7ef 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "story_changes",
@@ -51,11 +49,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index 211c7f268..095c72b3d 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -38,9 +38,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "stories",
@@ -51,11 +50,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go
b/backend/plugins/tapd/tasks/story_commit_collector.go
index b69c6fa96..708c71010 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -44,17 +44,13 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_stories.id as issue_id, modified as
update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -66,7 +62,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index e4efec684..7aba00c2a 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -38,9 +38,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "task_changes",
@@ -50,11 +48,8 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index 3511fff11..2d5bfdba3 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -38,10 +38,9 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "tasks",
@@ -52,11 +51,8 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 88da97d6e..0ef938997 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -44,17 +44,13 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_tasks.id as issue_id, modified as
update_time"),
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -66,7 +62,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index 935f89758..b091f9e44 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -38,9 +38,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "timesheets",
@@ -50,11 +49,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 526ffc8a4..cb55ceda6 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,13 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -95,7 +90,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "bugs/{{ .Input.BugId }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 117fdc7d9..a536f52f1 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,13 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`_tool_zentao_project_stories.project_id = ? and
_tool_zentao_project_stories.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -93,7 +88,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "stories/{{ .Input.ID }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 4d4c54feb..665069508 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,13 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -91,7 +86,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "tasks/{{ .Input.ID }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {