This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch fix#6075-2
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/fix#6075-2 by this push:
new 3cfd68b28 fix: when since is nil and add collector IsIncreamtal
condition
3cfd68b28 is described below
commit 3cfd68b289d43cd3424494add13fc5df4e3594bc
Author: abeizn <[email protected]>
AuthorDate: Mon Sep 18 17:35:51 2023 +0800
fix: when since is nil and add collector IsIncreamtal condition
---
.../pluginhelper/api/api_collector_with_state.go | 40 +++++++++-------------
backend/plugins/bitbucket/tasks/api_common.go | 19 ++++++----
backend/plugins/github/tasks/cicd_job_collector.go | 5 +--
backend/plugins/github/tasks/comment_collector.go | 4 ++-
backend/plugins/github/tasks/commit_collector.go | 4 ++-
backend/plugins/github/tasks/issue_collector.go | 4 ++-
.../plugins/github/tasks/pr_commit_collector.go | 11 +++---
.../plugins/github/tasks/pr_review_collector.go | 10 +++---
.../github/tasks/pr_review_comment_collector.go | 4 ++-
.../github_graphql/tasks/issue_collector.go | 5 ++-
.../plugins/github_graphql/tasks/job_collector.go | 4 ++-
.../plugins/github_graphql/tasks/pr_collector.go | 2 +-
backend/plugins/gitlab/tasks/issue_collector.go | 4 ++-
backend/plugins/gitlab/tasks/mr_collector.go | 4 ++-
.../plugins/gitlab/tasks/mr_detail_collector.go | 4 ++-
backend/plugins/gitlab/tasks/pipeline_collector.go | 4 ++-
.../gitlab/tasks/pipeline_detail_collector.go | 5 +--
backend/plugins/gitlab/tasks/shared.go | 4 +--
backend/plugins/jenkins/tasks/stage_collector.go | 4 ++-
.../jira/tasks/development_panel_collector.go | 5 +--
.../jira/tasks/issue_changelog_collector.go | 4 ++-
.../plugins/jira/tasks/issue_comment_collector.go | 5 +--
backend/plugins/jira/tasks/remotelink_collector.go | 5 +--
backend/plugins/jira/tasks/worklog_collector.go | 15 ++++----
.../plugins/tapd/tasks/bug_changelog_collector.go | 4 ++-
backend/plugins/tapd/tasks/bug_collector.go | 4 ++-
backend/plugins/tapd/tasks/bug_commit_collector.go | 5 +--
backend/plugins/tapd/tasks/iteration_collector.go | 4 ++-
backend/plugins/tapd/tasks/story_bug_collector.go | 5 +--
.../tapd/tasks/story_changelog_collector.go | 4 ++-
backend/plugins/tapd/tasks/story_collector.go | 4 ++-
.../plugins/tapd/tasks/story_commit_collector.go | 5 +--
.../plugins/tapd/tasks/task_changelog_collector.go | 4 ++-
backend/plugins/tapd/tasks/task_collector.go | 4 ++-
.../plugins/tapd/tasks/task_commit_collector.go | 5 +--
backend/plugins/tapd/tasks/worklog_collector.go | 4 ++-
.../plugins/zentao/tasks/bug_commits_collector.go | 5 +--
.../zentao/tasks/story_commits_collector.go | 5 +--
.../plugins/zentao/tasks/task_commits_collector.go | 5 +--
39 files changed, 147 insertions(+), 95 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 988d73d97..007f34f42 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -37,7 +37,7 @@ type ApiCollectorStateManager struct {
// *GraphqlCollector
subtasks []plugin.SubTask
LatestState models.CollectorLatestState
- NewState models.CollectorLatestState
+ newState models.CollectorLatestState
IsIncreamtal bool
Since *time.Time
}
@@ -68,36 +68,28 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
oldTimeAfter := oldState.TimeAfter
oldLatestSuccessStart := oldState.LatestSuccessStart
- // Get syncPolicy timeAfter and fullSync from context
- var newTimeAfter *time.Time
- var fullSync bool
- syncPolicy := args.Ctx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- newTimeAfter = syncPolicy.TimeAfter
- fullSync = syncPolicy.FullSync
- }
-
// Calculate incremental and since based on syncPolicy and old state
+ syncPolicy := args.Ctx.TaskContext().SyncPolicy()
var isIncreamtal bool
var since *time.Time
- if oldLatestSuccessStart == nil {
- // 1. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
- isIncreamtal = false
- since = newTimeAfter
- } else if syncPolicy == nil {
- // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
+ if syncPolicy == nil {
+ // 1. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
isIncreamtal = true
since = oldLatestSuccessStart
- } else if fullSync {
+ } else if oldLatestSuccessStart == nil {
+ // 2. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = syncPolicy.TimeAfter
+ } else if syncPolicy.FullSync {
// 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
isIncreamtal = false
- since = newTimeAfter
- } else if newTimeAfter != nil {
+ since = syncPolicy.TimeAfter
+ } else if syncPolicy.TimeAfter != nil {
// 4. If syncPolicy.TimeAfter is not nil, compare with old
oldState.TimeAfter
- if oldTimeAfter == nil || !newTimeAfter.Before(*oldTimeAfter) {
+ if oldTimeAfter == nil ||
!syncPolicy.TimeAfter.Before(*oldTimeAfter) {
isIncreamtal = false
- since = newTimeAfter
+ since = syncPolicy.TimeAfter
} else {
isIncreamtal = true
since = oldLatestSuccessStart
@@ -106,11 +98,11 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
currentTime := time.Now()
oldState.LatestSuccessStart = ¤tTime
- oldState.TimeAfter = newTimeAfter
+ oldState.TimeAfter = syncPolicy.TimeAfter
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
- NewState: oldState,
+ newState: oldState,
IsIncreamtal: isIncreamtal,
Since: since,
}, nil
@@ -150,7 +142,7 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
}
db := m.Ctx.GetDal()
- return db.CreateOrUpdate(&m.NewState)
+ return db.CreateOrUpdate(&m.newState)
}
// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync
support for
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 31bf47d73..6e03c3da9 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -101,7 +101,10 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
}
query.Set("fields", fields)
query.Set("sort", "created_on")
- query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
+
+ if collectorWithState.IsIncreamtal && collectorWithState.Since
!= nil {
+ query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
+ }
return query, nil
}
}
@@ -172,7 +175,9 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)
@@ -194,8 +199,9 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -216,8 +222,9 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("bitbucket_complete_on > ?",
*collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.Since))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index da2e977c8..93617f779 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,8 +73,9 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
data.Options.GithubId, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index 5c2d2afd2..73dfad834 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -65,7 +65,9 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- query.Set("since", collectorWithState.Since.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
+ }
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index 5129a26c5..5a9be44db 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -77,7 +77,9 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- query.Set("since", collectorWithState.Since.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
+ }
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index b084a767a..67d2ce506 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -77,7 +77,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- query.Set("since", collectorWithState.Since.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
+ }
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 4b8707df1..77c0e2a26 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -78,11 +78,12 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
-
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?", collectorWithState.Since),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(
+ clauses,
+ dal.Where("github_updated_at > ?",
collectorWithState.Since),
+ )
+ }
cursor, err := db.Cursor(
clauses...,
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index b9c97e193..6cc8d8ce9 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -70,10 +70,12 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- clauses = append(
- clauses,
- dal.Where("github_updated_at > ?", collectorWithState.Since),
- )
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(
+ clauses,
+ dal.Where("github_updated_at > ?",
collectorWithState.Since),
+ )
+ }
cursor, err := db.Cursor(
clauses...,
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index 26d4e762d..e05ae2812 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -74,7 +74,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- query.Set("since", collectorWithState.Since.String())
+ if collectorWithState.Since != nil {
+ query.Set("since",
collectorWithState.Since.String())
+ }
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7239ddb05..cfb0459e4 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -117,7 +117,10 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
if reqData == nil {
return query, map[string]interface{}{}, nil
}
- since := helper.DateTime{Time:
*collectorWithState.Since}
+ since := helper.DateTime{}
+ if collectorWithState.Since != nil {
+ since = helper.DateTime{Time:
*collectorWithState.Since}
+ }
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"since": since,
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 82d8c6904..41836d3a8 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -122,7 +122,9 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
+ }
cursor, err := db.Cursor(
clauses...,
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 25048af48..37d10e742 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -192,7 +192,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
isFinish := false
for _, rawL := range prs {
// collect data even though in increment mode
because of updating existing data
- if collectorWithState.LatestState.TimeAfter !=
nil && !collectorWithState.LatestState.TimeAfter.Before(rawL.UpdatedAt) {
+ if collectorWithState.Since != nil &&
!collectorWithState.Since.Before(rawL.UpdatedAt) {
isFinish = true
break
}
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go
b/backend/plugins/gitlab/tasks/issue_collector.go
index 24b813d0d..81c78be37 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -61,7 +61,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
*/
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ }
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go
b/backend/plugins/gitlab/tasks/mr_collector.go
index 558816dfb..ce0f21b3c 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -59,7 +59,9 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return nil, err
}
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ }
return query, nil
},
})
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index c74635942..f60871a24 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -85,7 +85,9 @@ func GetMergeRequestDetailsIterator(taskCtx
plugin.SubTaskContext, collectorWith
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index acd37d283..06a14c925 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -62,7 +62,9 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if collectorWithState.Since != nil {
+ query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ }
query.Set("with_stats", "true")
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index df9c2d188..aa4c80148 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -93,8 +93,9 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *hel
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
-
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ }
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/gitlab/tasks/shared.go
b/backend/plugins/gitlab/tasks/shared.go
index 06cc46225..7f1e3e8c1 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -159,8 +159,8 @@ func GetMergeRequestsIterator(taskCtx
plugin.SubTaskContext, collectorWithState
),
}
if collectorWithState != nil {
- if collectorWithState.LatestState.LatestSuccessStart != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *collectorWithState.Since))
}
}
// construct the input iterator
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index 9b04ff977..47530c95e 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,7 +67,9 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index 5baa72264..64a7ea372 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,8 +72,9 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
-
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect development panel error")
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 1d0ca97f2..bcbf46dc0 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,7 +71,9 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ }
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index 513d82b6b..71c91491e 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,8 +71,9 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ }
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
if err != nil {
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index 7f0207f9d..450723ec2 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,8 +70,9 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.LatestState.LatestSuccessStart))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect remotelink error")
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index 4a08d9b10..71121597f 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,22 +66,19 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
-
- isIncreamtal := collectorWithState.IsIncreamtal
- if isIncreamtal {
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
} else {
/*
- i.updated > max(rl.issue_updated) was deleted because
for non-incremental collection,
- max(rl.issue_updated) will only be one of null, less or
equal to i.updated
- so i.updated > max(rl.issue_updated) is always false.
- max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) >
0 infers the issue has more than 100 worklogs,
+ i.updated > max(wl.issue_updated) was deleted because
for non-incremental collection,
+ max(wl.issue_updated) will only be one of null, less or
equal to i.updated
+ so i.updated > max(wl.issue_updated) is always false.
+ max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id)
> 0 infers the issue has more than 100 worklogs,
because we collected worklogs when collecting issues,
and assign worklog.issue_updated if num of worklogs < 100,
- and max(c.issue_updated) IS NULL AND
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
+ and max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0 means all worklogs for the issue were not assigned
issue_updated
*/
clauses = append(clauses, dal.Having("max(wl.issue_updated) IS
NULL AND COUNT(wl.worklog_id) > 0"))
}
-
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 7e88046cd..bf768e0d1 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -49,7 +49,9 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index e4cf1e8d9..746c1bd89 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -50,7 +50,9 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index e525721c4..3164adb05 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -50,8 +50,9 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
-
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index 9e7d18ea1..5f5512b1f 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -52,7 +52,9 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go
b/backend/plugins/tapd/tasks/story_bug_collector.go
index 87b3a8989..9d5d0ed0c 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -48,8 +48,9 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
-
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index 1e07c4b3c..d0597b7ef 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -49,7 +49,9 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index 06715b0b7..095c72b3d 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -50,7 +50,9 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go
b/backend/plugins/tapd/tasks/story_commit_collector.go
index 2f7e73bf9..708c71010 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -49,8 +49,9 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
-
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 9824c8b43..7aba00c2a 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -48,7 +48,9 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index 76c12bb5c..2d5bfdba3 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -51,7 +51,9 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error
{
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go
b/backend/plugins/tapd/tasks/task_commit_collector.go
index f3188ebb6..0ef938997 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -49,8 +49,9 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
-
+ if collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index e6be9d2d6..b091f9e44 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -49,7 +49,9 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.Since != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ }
return query, nil
},
ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index a4b58befe..cb55ceda6 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,8 +70,9 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 33d07d13a..a536f52f1 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,8 +66,9 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`_tool_zentao_project_stories.project_id = ? and
_tool_zentao_project_stories.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId),
}
- clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 8defc3ff0..665069508 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,8 +64,9 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- clauses = append(clauses, dal.Where("last_edited_date is not null and
last_edited_date > ?", collectorWithState.Since))
-
+ if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
return err