This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch fix#6075-2
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/fix#6075-2 by this push:
new 3e5317f27 refactor: use since instead of
timeafter,incremental,latestSuccessStart
3e5317f27 is described below
commit 3e5317f2766545ffc5a41b880e9127c3e3d78b61
Author: abeizn <[email protected]>
AuthorDate: Thu Sep 14 19:22:46 2023 +0800
refactor: use since instead of timeafter,incremental,latestSuccessStart
---
.../pluginhelper/api/api_collector_with_state.go | 42 +++++++---------
backend/plugins/bitbucket/tasks/api_common.go | 24 +++-------
backend/plugins/bitbucket/tasks/issue_collector.go | 1 -
.../bitbucket/tasks/issue_comment_collector.go | 1 -
.../plugins/bitbucket/tasks/pipeline_collector.go | 1 -
.../bitbucket/tasks/pipeline_steps_collector.go | 1 -
backend/plugins/bitbucket/tasks/pr_collector.go | 1 -
.../bitbucket/tasks/pr_comment_collector.go | 1 -
.../plugins/bitbucket/tasks/pr_commit_collector.go | 1 -
backend/plugins/github/tasks/cicd_job_collector.go | 11 +----
backend/plugins/github/tasks/comment_collector.go | 12 +----
backend/plugins/github/tasks/commit_collector.go | 13 ++---
backend/plugins/github/tasks/issue_collector.go | 13 ++---
.../plugins/github/tasks/pr_commit_collector.go | 22 ++++-----
.../plugins/github/tasks/pr_review_collector.go | 19 ++++----
.../github/tasks/pr_review_comment_collector.go | 15 ++----
.../github_graphql/tasks/issue_collector.go | 9 +---
.../plugins/github_graphql/tasks/job_collector.go | 8 +---
.../plugins/github_graphql/tasks/pr_collector.go | 2 -
backend/plugins/gitlab/tasks/issue_collector.go | 13 ++---
backend/plugins/gitlab/tasks/mr_collector.go | 9 +---
backend/plugins/gitlab/tasks/mr_note_collector.go | 3 --
backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +---
.../gitlab/tasks/pipeline_detail_collector.go | 8 +---
backend/plugins/jenkins/tasks/stage_collector.go | 7 +--
.../jira/tasks/development_panel_collector.go | 13 ++---
.../jira/tasks/issue_changelog_collector.go | 9 +---
backend/plugins/jira/tasks/issue_collector.go | 29 +++++------
backend/plugins/jira/tasks/issue_collector_test.go | 56 +++-------------------
.../plugins/jira/tasks/issue_comment_collector.go | 9 +---
backend/plugins/jira/tasks/remotelink_collector.go | 9 +---
backend/plugins/jira/tasks/worklog_collector.go | 16 +------
.../plugins/tapd/tasks/bug_changelog_collector.go | 9 +---
backend/plugins/tapd/tasks/bug_collector.go | 10 +---
backend/plugins/tapd/tasks/bug_commit_collector.go | 11 ++---
backend/plugins/tapd/tasks/iteration_collector.go | 10 +---
backend/plugins/tapd/tasks/story_bug_collector.go | 11 ++---
.../tapd/tasks/story_changelog_collector.go | 9 +---
backend/plugins/tapd/tasks/story_collector.go | 10 +---
.../plugins/tapd/tasks/story_commit_collector.go | 10 +---
.../plugins/tapd/tasks/task_changelog_collector.go | 9 +---
backend/plugins/tapd/tasks/task_collector.go | 10 +---
.../plugins/tapd/tasks/task_commit_collector.go | 10 +---
backend/plugins/tapd/tasks/worklog_collector.go | 10 +---
.../plugins/zentao/tasks/bug_commits_collector.go | 11 +----
.../zentao/tasks/story_commits_collector.go | 11 +----
.../plugins/zentao/tasks/task_commits_collector.go | 11 +----
47 files changed, 118 insertions(+), 422 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 813a75f73..a6b35e71c 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -38,6 +38,7 @@ type ApiCollectorStateManager struct {
subtasks []plugin.SubTask
LatestState models.CollectorLatestState
TimeAfter *time.Time
+ Since *time.Time
ExecuteStart time.Time
}
@@ -66,32 +67,32 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if syncPolicy != nil && syncPolicy.TimeAfter != nil {
timeAfter = syncPolicy.TimeAfter
}
+ since := GetSince(latestState.LatestSuccessStart, syncPolicy)
+
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
LatestState: latestState,
TimeAfter: timeAfter,
+ Since: since,
ExecuteStart: time.Now(),
}, nil
}
-// IsIncremental indicates if the collector should operate in incremental mode
-func (m *ApiCollectorStateManager) IsIncremental() bool {
- prevSyncTime := m.LatestState.LatestSuccessStart
- prevTimeAfter := m.LatestState.TimeAfter
- syncPolicy := m.Ctx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.FullSync {
- return false
+func GetSince(latestSuccessStart *time.Time, syncPolicy *models.SyncPolicy)
*time.Time {
+ // If the execution has not been successful before, return timeAfter
+ if latestSuccessStart == nil {
+ return syncPolicy.TimeAfter
}
-
- if prevSyncTime == nil {
- return false
+ // If syncPolicy is nil, return latestSuccessStart
+ if syncPolicy == nil {
+ return latestSuccessStart
}
- // if we cleared the timeAfter, or moved timeAfter back in time, we
should do a full sync
- currTimeAfter := syncPolicy.TimeAfter
- if currTimeAfter != nil {
- return prevTimeAfter == nil ||
!currTimeAfter.Before(*prevTimeAfter)
+ // If syncPolicy is fullSync or timeAfter is after latestSuccessStart,
return timeAfter
+ if syncPolicy.FullSync ||
syncPolicy.TimeAfter.After(*latestSuccessStart) {
+ return syncPolicy.TimeAfter
}
- return prevTimeAfter == nil
+
+ return latestSuccessStart
}
// InitCollector init the embedded collector
@@ -165,19 +166,11 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
return nil, err
}
- var isIncremental = manager.IsIncremental()
- var createdAfter *time.Time
- if isIncremental {
- createdAfter = manager.LatestState.LatestSuccessStart
- } else {
- createdAfter = manager.TimeAfter
- }
-
+ createdAfter := manager.Since
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
- Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
if args.CollectNewRecordsByList.Query != nil {
@@ -249,7 +242,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
- Incremental: true,
Input: input,
UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 284df3df2..31bf47d73 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -101,14 +101,7 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
}
query.Set("fields", fields)
query.Set("sort", "created_on")
- if collectorWithState.IsIncremental() {
- latestSuccessStart :=
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s",
latestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- timeAfter :=
collectorWithState.TimeAfter.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
- }
-
+ query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
return query, nil
}
}
@@ -179,9 +172,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -202,9 +194,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -225,9 +216,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental() {
- clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("bitbucket_complete_on > ?",
*collectorWithState.Since))
+
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go
b/backend/plugins/bitbucket/tasks/issue_collector.go
index a5f2e4a41..3edec1bea 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -43,7 +43,6 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/issues",
Query: GetQueryCreatedAndUpdated(
`values.type,values.id,values.links.self,`+
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8e0036c4a..cdc9c2e3d 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName }}/issues/{{
.Input.BitbucketId }}/comments",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index e25861936..d11a7f243 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -43,7 +43,6 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/",
Query: GetQueryCreatedAndUpdated(
`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index 99cc1e39b..8c9d70aa7 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -52,7 +52,6 @@ func CollectPipelineSteps(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/{{
.Input.BitbucketId }}/steps/",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go
b/backend/plugins/bitbucket/tasks/pr_collector.go
index 7379cc1e4..7aa9641e9 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -46,7 +46,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: collectorWithState.IsIncremental(),
UrlTemplate: "repositories/{{ .Params.FullName }}/pullrequests",
Query: GetQueryCreatedAndUpdated(
`values.id,values.comment_count,values.type,values.state,values.title,values.description,`+
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index c4d539f85..ee7ccd228 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiPullRequestsComments(taskCtx
plugin.SubTaskContext) errors.Error
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName
}}/pullrequests/{{ .Input.BitbucketId }}/comments",
Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 9d99da39c..8ed4a5a11 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -52,7 +52,6 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: collectorWithState.IsIncremental(),
Input: iterator,
UrlTemplate: "repositories/{{ .Params.FullName
}}/pullrequests/{{ .Input.BitbucketId }}/commits",
GetNextPageCustomData: GetNextPageCustomData,
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index 73fa2ab0d..da2e977c8 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,14 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error
{
data.Options.GithubId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -102,7 +96,6 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error
{
ApiClient: data.ApiClient,
PageSize: 100,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{
.Input.ID }}/jobs",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index 197a1f7a0..5c2d2afd2 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -58,24 +58,14 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
-
UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- // Note that `since` is for filtering records
by the `updated` time
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- // if incremental == true, we overwrite it
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
- }
+ query.Set("since", collectorWithState.Since.String())
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index ca37c6227..5129a26c5 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -58,11 +58,9 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
/*
url may use arbitrary variables from different source
in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -79,12 +77,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
- }
+ query.Set("since", collectorWithState.Since.String())
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index c27eaea86..b084a767a 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -58,11 +58,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
/*
url may use arbitrary variables from different source
in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -79,12 +77,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.TimeAfter != nil {
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
- }
+ query.Set("since", collectorWithState.Since.String())
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 0a209ff2e..4b8707df1 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -73,20 +73,17 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
clauses := []dal.Clause{
dal.Select("number, github_id"),
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- // incremental collection, no need to care about the timeFilter since
it has to be collected by PR
- if incremental {
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+
+ clauses = append(
+ clauses,
+ dal.Where("github_updated_at > ?", collectorWithState.Since),
+ )
+
cursor, err := db.Cursor(
clauses...,
)
@@ -98,10 +95,9 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
- Input: iterator,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Input: iterator,
UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number
}}/commits",
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index 18bbde6b3..b9c97e193 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -65,18 +65,16 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("number, github_id"),
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if incremental {
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(
+ clauses,
+ dal.Where("github_updated_at > ?", collectorWithState.Since),
+ )
+
cursor, err := db.Cursor(
clauses...,
)
@@ -90,10 +88,9 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
}
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
- Input: iterator,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Input: iterator,
UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number
}}/reviews",
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index eb3720ce0..26d4e762d 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -61,11 +61,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
Header: func(reqData *helper.RequestData) (http.Header,
errors.Error) {
// Adding -H "Accept: application/vnd.github+json"
solve the issue of getting 502/403 error
header := http.Header{}
@@ -76,14 +74,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- // Note that `since` is for filtering records
by the `updated` time
- query.Set("since",
collectorWithState.TimeAfter.String())
- }
- // if incremental == true, we overwrite it
- if incremental {
- query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
- }
+ query.Set("since", collectorWithState.Since.String())
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7645db8d9..7239ddb05 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -109,22 +109,15 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 100,
- Incremental: incremental,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
query := &GraphqlQueryIssueWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
- since := helper.DateTime{}
- if incremental {
- since = helper.DateTime{Time:
*collectorWithState.LatestState.LatestSuccessStart}
- } else if collectorWithState.TimeAfter != nil {
- since = helper.DateTime{Time:
*collectorWithState.TimeAfter}
- }
+ since := helper.DateTime{Time:
*collectorWithState.Since}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"since": since,
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 095276c4a..82d8c6904 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -116,17 +116,14 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
clauses := []dal.Clause{
dal.Select("check_suite_node_id"),
dal.From(models.GithubRun{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- if incremental {
- clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
+
cursor, err := db.Cursor(
clauses...,
)
@@ -142,7 +139,6 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
InputStep: 20,
- Incremental: incremental,
GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
query := &GraphqlQueryCheckRunWrapper{}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index afd8ff0e8..81437a451 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -160,11 +160,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
- Incremental: incremental,
/*
(Optional) Return query string for request, or you can
plug them into UrlTemplate directly
*/
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go
b/backend/plugins/gitlab/tasks/issue_collector.go
index 0829f9cde..24b813d0d 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -51,11 +51,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
UrlTemplate: "projects/{{ .Params.ProjectId }}/issues",
/*
@@ -63,12 +61,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
*/
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
- }
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go
b/backend/plugins/gitlab/tasks/mr_collector.go
index c55fe760e..558816dfb 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -48,11 +48,9 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/merge_requests",
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: GetRawMessageFromResponse,
@@ -61,12 +59,7 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return nil, err
}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
- }
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
return query, nil
},
})
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index d6fde5751..282135982 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -51,12 +51,9 @@ func CollectApiMergeRequestsNotes(taskCtx
plugin.SubTaskContext) errors.Error {
}
defer iterator.Close()
- incremental := collectorWithState.IsIncremental()
-
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/merge_requests/{{ .Input.Iid }}/notes?system=false",
Query: GetQuery,
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index c22cc872a..acd37d283 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -53,22 +53,16 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
PageSize: 100,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
- }
- if incremental {
- query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
- }
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
query.Set("with_stats", "true")
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index a37807b8c..df9c2d188 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -55,8 +55,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.IsIncremental()
-
iterator, err := GetPipelinesIterator(taskCtx, collectorWithState)
if err != nil {
return err
@@ -68,7 +66,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext)
errors.Error {
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines/{{ .Input.GitlabId }}",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
@@ -96,9 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *hel
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index bf159c1d6..9b04ff977 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,12 +67,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
-
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
- }
-
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index 278052cdc..5baa72264 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,14 +72,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect development panel error")
@@ -93,9 +87,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- Input: iterator,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ Input: iterator,
// the URL looks like:
//
https://merico.atlassian.net/rest/dev-status/1.0/issue/detail?issueId=25184&applicationType=GitLab&dataType=repository
UrlTemplate: "dev-status/1.0/issue/detail",
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 7b5fc7674..1d0ca97f2 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,13 +71,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
@@ -102,7 +96,6 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/backend/plugins/jira/tasks/issue_collector.go
b/backend/plugins/jira/tasks/issue_collector.go
index b00cd4ff3..3faaa9d51 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -70,19 +70,20 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
// build jql
// IMPORTANT: we have to keep paginated data in a consistence order to
avoid data-missing, if we sort issues by
// `updated`, issue will be jumping between pages if it got updated
during the collection process
- incremental := collectorWithState.IsIncremental()
loc, err := getTimeZone(taskCtx)
if err != nil {
logger.Info("failed to get timezone, err: %v", err)
} else {
logger.Info("got user's timezone: %v", loc.String())
}
- jql := buildJQL(collectorWithState.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+ jql := "ORDER BY created ASC"
+ if collectorWithState.Since != nil {
+ jql = buildJQL(*collectorWithState.Since, loc)
+ }
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: data.Options.PageSize,
- Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: data.Options.PageSize,
/*
url may use arbitrary variables from different
connection in any order, we need GoTemplate to allow more
flexible for all kinds of possibility.
@@ -146,23 +147,15 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
}
// buildJQL build jql based on timeAfter and incremental mode
-func buildJQL(timeAfter, latestSuccessStart *time.Time, isIncremental bool,
location *time.Location) string {
+func buildJQL(since time.Time, location *time.Location) string {
jql := "ORDER BY created ASC"
- var moment time.Time
- if timeAfter != nil {
- moment = *timeAfter
- }
- // if isIncremental is true, we should not collect data before
latestSuccessStart
- if isIncremental && latestSuccessStart.After(moment) {
- moment = *latestSuccessStart
- }
- if !moment.IsZero() {
+ if !since.IsZero() {
if location != nil {
- moment = moment.In(location)
+ since = since.In(location)
} else {
- moment = moment.In(time.UTC).Add(-24 * time.Hour)
+ since = since.In(time.UTC).Add(-24 * time.Hour)
}
- jql = fmt.Sprintf("updated >= '%s' %s",
moment.Format("2006/01/02 15:04"), jql)
+ jql = fmt.Sprintf("updated >= '%s' %s",
since.Format("2006/01/02 15:04"), jql)
}
return jql
}
diff --git a/backend/plugins/jira/tasks/issue_collector_test.go
b/backend/plugins/jira/tasks/issue_collector_test.go
index c9891943d..99bf5a533 100644
--- a/backend/plugins/jira/tasks/issue_collector_test.go
+++ b/backend/plugins/jira/tasks/issue_collector_test.go
@@ -26,13 +26,10 @@ func Test_buildJQL(t *testing.T) {
base := time.Date(2021, 2, 3, 4, 5, 6, 7, time.UTC)
timeAfter := base
add48 := base.Add(48 * time.Hour)
- minus48 := base.Add(-48 * time.Hour)
loc, _ := time.LoadLocation("Asia/Shanghai")
type args struct {
- timeAfter *time.Time
- latestSuccessStart *time.Time
- isIncremental bool
- location *time.Location
+ since *time.Time
+ location *time.Location
}
tests := []struct {
name string
@@ -42,56 +39,15 @@ func Test_buildJQL(t *testing.T) {
{
name: "test incremental",
args: args{
- timeAfter: nil,
- latestSuccessStart: nil,
- isIncremental: false,
- },
- want: "ORDER BY created ASC"},
- {
- name: "test incremental",
- args: args{
- timeAfter: nil,
- latestSuccessStart: &add48,
- isIncremental: true,
- location: loc,
- },
- want: "updated >= '2021/02/05 12:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &base,
- latestSuccessStart: nil,
- isIncremental: false,
- location: loc,
- },
- want: "updated >= '2021/02/03 12:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &add48,
- isIncremental: true,
- },
- want: "updated >= '2021/02/04 04:05' ORDER BY created
ASC",
- },
- {
- name: "test incremental",
- args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &add48,
- isIncremental: true,
- location: loc,
+ since: &add48,
+ location: loc,
},
want: "updated >= '2021/02/05 12:05' ORDER BY created
ASC",
},
{
name: "test incremental",
args: args{
- timeAfter: &timeAfter,
- latestSuccessStart: &minus48,
- isIncremental: true,
+ since: &timeAfter,
},
want: "updated >= '2021/02/02 04:05' ORDER BY created
ASC",
},
@@ -99,7 +55,7 @@ func Test_buildJQL(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := buildJQL(tt.args.timeAfter,
tt.args.latestSuccessStart, tt.args.isIncremental, tt.args.location); got !=
tt.want {
+ if got := buildJQL(*tt.args.since, tt.args.location);
got != tt.want {
t.Errorf("buildJQL() = %v, want %v", got,
tt.want)
}
})
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index fb0d2168c..513d82b6b 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,13 +71,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart))
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
@@ -102,7 +96,6 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/comment",
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index 158640d5c..7f0207f9d 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,13 +70,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- incremental := collectorWithState.IsIncremental()
- if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
- clauses = append(
- clauses,
- dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart))
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -93,7 +87,6 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
if res.StatusCode == http.StatusNotFound {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index d0feab119..22e4bd4d9 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,20 +66,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
- } else {
- /*
- i.updated > max(rl.issue_updated) was deleted because
for non-incremental collection,
- max(rl.issue_updated) will only be one of null, less or
equal to i.updated
- so i.updated > max(rl.issue_updated) is always false.
- max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) >
0 infers the issue has more than 100 worklogs,
- because we collected worklogs when collecting issues,
and assign worklog.issue_updated if num of worklogs < 100,
- and max(c.issue_updated) IS NULL AND
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
- */
- clauses = append(clauses, dal.Having("max(wl.issue_updated) IS
NULL AND COUNT(wl.worklog_id) > 0"))
- }
+ clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated >
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
// construct the input iterator
cursor, err := db.Cursor(clauses...)
@@ -96,7 +83,6 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 264e45342..7e88046cd 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bug_changes",
@@ -51,12 +49,7 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index b5c85f68a..e4cf1e8d9 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -38,9 +38,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bugs",
@@ -51,12 +50,7 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error
{
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 97a1c2937..e525721c4 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -44,18 +44,14 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
+
clauses := []dal.Clause{
dal.Select("_tool_tapd_bugs.id as issue_id, modified as
update_time"),
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -67,7 +63,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index 3a6ebc182..9e7d18ea1 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -40,9 +40,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
Concurrency: 3,
@@ -53,12 +52,7 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go
b/backend/plugins/tapd/tasks/story_bug_collector.go
index b9ac18dfa..87b3a8989 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -42,18 +42,14 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect storyBugs")
- incremental := collectorWithState.IsIncremental()
+
clauses := []dal.Clause{
dal.Select("id as issue_id, modified as update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -64,7 +60,6 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "stories/get_related_bugs",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index 7aa863417..1e07c4b3c 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "story_changes",
@@ -51,12 +49,7 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index 211c7f268..06715b0b7 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -38,9 +38,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "stories",
@@ -51,12 +50,7 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go
b/backend/plugins/tapd/tasks/story_commit_collector.go
index b69c6fa96..2f7e73bf9 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -44,18 +44,13 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_stories.id as issue_id, modified as
update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -66,7 +61,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index e4efec684..9824c8b43 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -38,9 +38,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
- incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "task_changes",
@@ -50,12 +48,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index 3511fff11..76c12bb5c 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -38,10 +38,9 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "tasks",
@@ -52,12 +51,7 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 88da97d6e..f3188ebb6 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -44,18 +44,13 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_tasks.id as issue_id, modified as
update_time"),
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
- }
- if incremental {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- }
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -66,7 +61,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "code_commit_infos",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index 935f89758..e6be9d2d6 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -38,9 +38,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental()
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- Incremental: incremental,
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "timesheets",
@@ -50,12 +49,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.TimeAfter != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
- }
- if incremental {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
- }
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 526ffc8a4..a4b58befe 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,14 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -95,7 +89,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "bugs/{{ .Input.BugId }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 117fdc7d9..33d07d13a 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,14 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`_tool_zentao_project_stories.project_id = ? and
_tool_zentao_project_stories.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -93,7 +87,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "stories/{{ .Input.ID }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 4d4c54feb..8defc3ff0 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,14 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- // incremental collection
- incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(
- clauses,
- dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
- )
- }
+ clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
+
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
@@ -91,7 +85,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
},
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: incremental,
UrlTemplate: "tasks/{{ .Input.ID }}",
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var data struct {