This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch fix#6075-2
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/fix#6075-2 by this push:
     new 3e5317f27 refactor: use since instead of 
timeafter,incremental,latestSuccessStart
3e5317f27 is described below

commit 3e5317f2766545ffc5a41b880e9127c3e3d78b61
Author: abeizn <[email protected]>
AuthorDate: Thu Sep 14 19:22:46 2023 +0800

    refactor: use since instead of timeafter,incremental,latestSuccessStart
---
 .../pluginhelper/api/api_collector_with_state.go   | 42 +++++++---------
 backend/plugins/bitbucket/tasks/api_common.go      | 24 +++-------
 backend/plugins/bitbucket/tasks/issue_collector.go |  1 -
 .../bitbucket/tasks/issue_comment_collector.go     |  1 -
 .../plugins/bitbucket/tasks/pipeline_collector.go  |  1 -
 .../bitbucket/tasks/pipeline_steps_collector.go    |  1 -
 backend/plugins/bitbucket/tasks/pr_collector.go    |  1 -
 .../bitbucket/tasks/pr_comment_collector.go        |  1 -
 .../plugins/bitbucket/tasks/pr_commit_collector.go |  1 -
 backend/plugins/github/tasks/cicd_job_collector.go | 11 +----
 backend/plugins/github/tasks/comment_collector.go  | 12 +----
 backend/plugins/github/tasks/commit_collector.go   | 13 ++---
 backend/plugins/github/tasks/issue_collector.go    | 13 ++---
 .../plugins/github/tasks/pr_commit_collector.go    | 22 ++++-----
 .../plugins/github/tasks/pr_review_collector.go    | 19 ++++----
 .../github/tasks/pr_review_comment_collector.go    | 15 ++----
 .../github_graphql/tasks/issue_collector.go        |  9 +---
 .../plugins/github_graphql/tasks/job_collector.go  |  8 +---
 .../plugins/github_graphql/tasks/pr_collector.go   |  2 -
 backend/plugins/gitlab/tasks/issue_collector.go    | 13 ++---
 backend/plugins/gitlab/tasks/mr_collector.go       |  9 +---
 backend/plugins/gitlab/tasks/mr_note_collector.go  |  3 --
 backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +---
 .../gitlab/tasks/pipeline_detail_collector.go      |  8 +---
 backend/plugins/jenkins/tasks/stage_collector.go   |  7 +--
 .../jira/tasks/development_panel_collector.go      | 13 ++---
 .../jira/tasks/issue_changelog_collector.go        |  9 +---
 backend/plugins/jira/tasks/issue_collector.go      | 29 +++++------
 backend/plugins/jira/tasks/issue_collector_test.go | 56 +++-------------------
 .../plugins/jira/tasks/issue_comment_collector.go  |  9 +---
 backend/plugins/jira/tasks/remotelink_collector.go |  9 +---
 backend/plugins/jira/tasks/worklog_collector.go    | 16 +------
 .../plugins/tapd/tasks/bug_changelog_collector.go  |  9 +---
 backend/plugins/tapd/tasks/bug_collector.go        | 10 +---
 backend/plugins/tapd/tasks/bug_commit_collector.go | 11 ++---
 backend/plugins/tapd/tasks/iteration_collector.go  | 10 +---
 backend/plugins/tapd/tasks/story_bug_collector.go  | 11 ++---
 .../tapd/tasks/story_changelog_collector.go        |  9 +---
 backend/plugins/tapd/tasks/story_collector.go      | 10 +---
 .../plugins/tapd/tasks/story_commit_collector.go   | 10 +---
 .../plugins/tapd/tasks/task_changelog_collector.go |  9 +---
 backend/plugins/tapd/tasks/task_collector.go       | 10 +---
 .../plugins/tapd/tasks/task_commit_collector.go    | 10 +---
 backend/plugins/tapd/tasks/worklog_collector.go    | 10 +---
 .../plugins/zentao/tasks/bug_commits_collector.go  | 11 +----
 .../zentao/tasks/story_commits_collector.go        | 11 +----
 .../plugins/zentao/tasks/task_commits_collector.go | 11 +----
 47 files changed, 118 insertions(+), 422 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 813a75f73..a6b35e71c 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -38,6 +38,7 @@ type ApiCollectorStateManager struct {
        subtasks     []plugin.SubTask
        LatestState  models.CollectorLatestState
        TimeAfter    *time.Time
+       Since        *time.Time
        ExecuteStart time.Time
 }
 
@@ -66,32 +67,32 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
        if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                timeAfter = syncPolicy.TimeAfter
        }
+       since := GetSince(latestState.LatestSuccessStart, syncPolicy)
+
        return &ApiCollectorStateManager{
                RawDataSubTaskArgs: args,
                LatestState:        latestState,
                TimeAfter:          timeAfter,
+               Since:              since,
                ExecuteStart:       time.Now(),
        }, nil
 }
 
-// IsIncremental indicates if the collector should operate in incremental mode
-func (m *ApiCollectorStateManager) IsIncremental() bool {
-       prevSyncTime := m.LatestState.LatestSuccessStart
-       prevTimeAfter := m.LatestState.TimeAfter
-       syncPolicy := m.Ctx.TaskContext().SyncPolicy()
-       if syncPolicy != nil && syncPolicy.FullSync {
-               return false
+func GetSince(latestSuccessStart *time.Time, syncPolicy *models.SyncPolicy) 
*time.Time {
+       // If the execution has not been successful before, return timeAfter
+       if latestSuccessStart == nil {
+               return syncPolicy.TimeAfter
        }
-
-       if prevSyncTime == nil {
-               return false
+       // If syncPolicy is nil, return latestSuccessStart
+       if syncPolicy == nil {
+               return latestSuccessStart
        }
-       // if we cleared the timeAfter, or moved timeAfter back in time, we 
should do a full sync
-       currTimeAfter := syncPolicy.TimeAfter
-       if currTimeAfter != nil {
-               return prevTimeAfter == nil || 
!currTimeAfter.Before(*prevTimeAfter)
+       // If syncPolicy is fullSync or timeAfter is after latestSuccessStart, 
return timeAfter
+       if syncPolicy.FullSync || 
syncPolicy.TimeAfter.After(*latestSuccessStart) {
+               return syncPolicy.TimeAfter
        }
-       return prevTimeAfter == nil
+
+       return latestSuccessStart
 }
 
 // InitCollector init the embedded collector
@@ -165,19 +166,11 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
                return nil, err
        }
 
-       var isIncremental = manager.IsIncremental()
-       var createdAfter *time.Time
-       if isIncremental {
-               createdAfter = manager.LatestState.LatestSuccessStart
-       } else {
-               createdAfter = manager.TimeAfter
-       }
-
+       createdAfter := manager.Since
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
                // common
-               Incremental: isIncremental,
                UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
                Query: func(reqData *RequestData) (url.Values, errors.Error) {
                        if args.CollectNewRecordsByList.Query != nil {
@@ -249,7 +242,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
                // common
-               Incremental: true,
                Input:       input,
                UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
                Query: func(reqData *RequestData) (url.Values, errors.Error) {
diff --git a/backend/plugins/bitbucket/tasks/api_common.go 
b/backend/plugins/bitbucket/tasks/api_common.go
index 284df3df2..31bf47d73 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -101,14 +101,7 @@ func GetQueryCreatedAndUpdated(fields string, 
collectorWithState *api.ApiCollect
                }
                query.Set("fields", fields)
                query.Set("sort", "created_on")
-               if collectorWithState.IsIncremental() {
-                       latestSuccessStart := 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
-                       query.Set("q", fmt.Sprintf("updated_on>=%s", 
latestSuccessStart))
-               } else if collectorWithState.TimeAfter != nil {
-                       timeAfter := 
collectorWithState.TimeAfter.Format(time.RFC3339)
-                       query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
-               }
-
+               query.Set("q", fmt.Sprintf("updated_on>=%s", 
collectorWithState.Since.Format(time.RFC3339)))
                return query, nil
        }
 }
@@ -179,9 +172,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -202,9 +194,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api.Ap
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -225,9 +216,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", 
*collectorWithState.Since))
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go 
b/backend/plugins/bitbucket/tasks/issue_collector.go
index a5f2e4a41..3edec1bea 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -43,7 +43,6 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/issues",
                Query: GetQueryCreatedAndUpdated(
                        `values.type,values.id,values.links.self,`+
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go 
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8e0036c4a..cdc9c2e3d 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName }}/issues/{{ 
.Input.BitbucketId }}/comments",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index e25861936..d11a7f243 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -43,7 +43,6 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    50,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/",
                Query: GetQueryCreatedAndUpdated(
                        
`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index 99cc1e39b..8c9d70aa7 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -52,7 +52,6 @@ func CollectPipelineSteps(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/{{ 
.Input.BitbucketId }}/steps/",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go 
b/backend/plugins/bitbucket/tasks/pr_collector.go
index 7379cc1e4..7aa9641e9 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -46,7 +46,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    50,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/pullrequests",
                Query: GetQueryCreatedAndUpdated(
                        
`values.id,values.comment_count,values.type,values.state,values.title,values.description,`+
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go 
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index c4d539f85..ee7ccd228 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiPullRequestsComments(taskCtx 
plugin.SubTaskContext) errors.Error
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName 
}}/pullrequests/{{ .Input.BitbucketId }}/comments",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go 
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 9d99da39c..8ed4a5a11 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -52,7 +52,6 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:             data.ApiClient,
                PageSize:              100,
-               Incremental:           collectorWithState.IsIncremental(),
                Input:                 iterator,
                UrlTemplate:           "repositories/{{ .Params.FullName 
}}/pullrequests/{{ .Input.BitbucketId }}/commits",
                GetNextPageCustomData: GetNextPageCustomData,
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index 73fa2ab0d..da2e977c8 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,14 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        data.Options.GithubId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("github_updated_at > ?", 
collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -102,7 +96,6 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error 
{
                ApiClient:   data.ApiClient,
                PageSize:    100,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ 
.Input.ID }}/jobs",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
diff --git a/backend/plugins/github/tasks/comment_collector.go 
b/backend/plugins/github/tasks/comment_collector.go
index 197a1f7a0..5c2d2afd2 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -58,24 +58,14 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: incremental,
-
                UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               // Note that `since` is for filtering records 
by the `updated` time
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       // if incremental == true, we overwrite it
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
-                       }
+                       query.Set("since", collectorWithState.Since.String())
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/commit_collector.go 
b/backend/plugins/github/tasks/commit_collector.go
index ca37c6227..5129a26c5 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -58,11 +58,9 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                /*
                        url may use arbitrary variables from different source 
in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -79,12 +77,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
-                       }
+                       query.Set("since", collectorWithState.Since.String())
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/issue_collector.go 
b/backend/plugins/github/tasks/issue_collector.go
index c27eaea86..b084a767a 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -58,11 +58,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                /*
                        url may use arbitrary variables from different source 
in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -79,12 +77,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
-                       }
+                       query.Set("since", collectorWithState.Since.String())
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go 
b/backend/plugins/github/tasks/pr_commit_collector.go
index 0a209ff2e..4b8707df1 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -73,20 +73,17 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       // incremental collection, no need to care about the timeFilter since 
it has to be collected by PR
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+
+       clauses = append(
+               clauses,
+               dal.Where("github_updated_at > ?", collectorWithState.Since),
+       )
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -98,10 +95,9 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
-               Input:       iterator,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
+               Input:     iterator,
 
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number 
}}/commits",
 
diff --git a/backend/plugins/github/tasks/pr_review_collector.go 
b/backend/plugins/github/tasks/pr_review_collector.go
index 18bbde6b3..b9c97e193 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -65,18 +65,16 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(
+               clauses,
+               dal.Where("github_updated_at > ?", collectorWithState.Since),
+       )
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -90,10 +88,9 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
        }
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
-               Input:       iterator,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
+               Input:     iterator,
 
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number 
}}/reviews",
 
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go 
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index eb3720ce0..26d4e762d 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -61,11 +61,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                Header: func(reqData *helper.RequestData) (http.Header, 
errors.Error) {
                        // Adding -H "Accept: application/vnd.github+json" 
solve the issue of getting 502/403 error
                        header := http.Header{}
@@ -76,14 +74,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               // Note that `since` is for filtering records 
by the `updated` time
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       // if incremental == true, we overwrite it
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
-                       }
+                       query.Set("since", collectorWithState.Since.String())
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7645db8d9..7239ddb05 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -109,22 +109,15 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
-               Incremental:   incremental,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
                        query := &GraphqlQueryIssueWrapper{}
                        if reqData == nil {
                                return query, map[string]interface{}{}, nil
                        }
-                       since := helper.DateTime{}
-                       if incremental {
-                               since = helper.DateTime{Time: 
*collectorWithState.LatestState.LatestSuccessStart}
-                       } else if collectorWithState.TimeAfter != nil {
-                               since = helper.DateTime{Time: 
*collectorWithState.TimeAfter}
-                       }
+                       since := helper.DateTime{Time: 
*collectorWithState.Since}
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
                                "since":      since,
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 095276c4a..82d8c6904 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -116,17 +116,14 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        clauses := []dal.Clause{
                dal.Select("check_suite_node_id"),
                dal.From(models.GithubRun{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
                dal.Orderby("github_updated_at DESC"),
        }
-       if incremental {
-               clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.Since))
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -142,7 +139,6 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                Input:         iterator,
                InputStep:     20,
-               Incremental:   incremental,
                GraphqlClient: data.GraphqlClient,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
                        query := &GraphqlQueryCheckRunWrapper{}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index afd8ff0e8..81437a451 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -160,11 +160,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      10,
-               Incremental:   incremental,
                /*
                        (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
                */
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go 
b/backend/plugins/gitlab/tasks/issue_collector.go
index 0829f9cde..24b813d0d 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -51,11 +51,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
 
                UrlTemplate: "projects/{{ .Params.ProjectId }}/issues",
                /*
@@ -63,12 +61,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                */
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
-                       }
+                       query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        query.Set("sort", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go 
b/backend/plugins/gitlab/tasks/mr_collector.go
index c55fe760e..558816dfb 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -48,11 +48,9 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
-               Incremental:    incremental,
                UrlTemplate:    "projects/{{ .Params.ProjectId 
}}/merge_requests",
                GetTotalPages:  GetTotalPagesFromResponse,
                ResponseParser: GetRawMessageFromResponse,
@@ -61,12 +59,7 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        if err != nil {
                                return nil, err
                        }
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
-                       }
+                       query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        return query, nil
                },
        })
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go 
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index d6fde5751..282135982 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -51,12 +51,9 @@ func CollectApiMergeRequestsNotes(taskCtx 
plugin.SubTaskContext) errors.Error {
        }
        defer iterator.Close()
 
-       incremental := collectorWithState.IsIncremental()
-
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
-               Incremental:    incremental,
                Input:          iterator,
                UrlTemplate:    "projects/{{ .Params.ProjectId 
}}/merge_requests/{{ .Input.Iid }}/notes?system=false",
                Query:          GetQuery,
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index c22cc872a..acd37d283 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -53,22 +53,16 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                MinTickInterval:    &tickInterval,
                PageSize:           100,
-               Incremental:        incremental,
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
-                       }
+                       query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        query.Set("with_stats", "true")
                        query.Set("sort", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index a37807b8c..df9c2d188 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -55,8 +55,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        iterator, err := GetPipelinesIterator(taskCtx, collectorWithState)
        if err != nil {
                return err
@@ -68,7 +66,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) 
errors.Error {
                ApiClient:          data.ApiClient,
                MinTickInterval:    &tickInterval,
                Input:              iterator,
-               Incremental:        incremental,
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines/{{ .Input.GitlabId }}",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
@@ -96,9 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *hel
                        data.Options.ProjectId, data.Options.ConnectionId, true,
                ),
        }
-       if collectorWithState.LatestState.LatestSuccessStart != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+
+       clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index bf159c1d6..9b04ff977 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,12 +67,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.LatestState.LatestSuccessStart))
-       }
-
+       clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.LatestState.LatestSuccessStart))
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go 
b/backend/plugins/jira/tasks/development_panel_collector.go
index 278052cdc..5baa72264 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,14 +72,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
 
+       clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                logger.Error(err, "collect development panel error")
@@ -93,9 +87,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               Input:       iterator,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               Input:     iterator,
                // the URL looks like:
                // 
https://merico.atlassian.net/rest/dev-status/1.0/issue/detail?issueId=25184&applicationType=GitLab&dataType=repository
                UrlTemplate: "dev-status/1.0/issue/detail",
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go 
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 7b5fc7674..1d0ca97f2 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,13 +71,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
 
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
                count, err := db.Count(clauses...)
@@ -102,7 +96,6 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                Input:         iterator,
                UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/backend/plugins/jira/tasks/issue_collector.go 
b/backend/plugins/jira/tasks/issue_collector.go
index b00cd4ff3..3faaa9d51 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -70,19 +70,20 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
        // build jql
        // IMPORTANT: we have to keep paginated data in a consistence order to 
avoid data-missing, if we sort issues by
        //  `updated`, issue will be jumping between pages if it got updated 
during the collection process
-       incremental := collectorWithState.IsIncremental()
        loc, err := getTimeZone(taskCtx)
        if err != nil {
                logger.Info("failed to get timezone, err: %v", err)
        } else {
                logger.Info("got user's timezone: %v", loc.String())
        }
-       jql := buildJQL(collectorWithState.TimeAfter, 
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+       jql := "ORDER BY created ASC"
+       if collectorWithState.Since != nil {
+               jql = buildJQL(*collectorWithState.Since, loc)
+       }
 
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    data.Options.PageSize,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  data.Options.PageSize,
                /*
                        url may use arbitrary variables from different 
connection in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -146,23 +147,15 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
 }
 
 // buildJQL build jql based on timeAfter and incremental mode
-func buildJQL(timeAfter, latestSuccessStart *time.Time, isIncremental bool, 
location *time.Location) string {
+func buildJQL(since time.Time, location *time.Location) string {
        jql := "ORDER BY created ASC"
-       var moment time.Time
-       if timeAfter != nil {
-               moment = *timeAfter
-       }
-       // if isIncremental is true, we should not collect data before 
latestSuccessStart
-       if isIncremental && latestSuccessStart.After(moment) {
-               moment = *latestSuccessStart
-       }
-       if !moment.IsZero() {
+       if !since.IsZero() {
                if location != nil {
-                       moment = moment.In(location)
+                       since = since.In(location)
                } else {
-                       moment = moment.In(time.UTC).Add(-24 * time.Hour)
+                       since = since.In(time.UTC).Add(-24 * time.Hour)
                }
-               jql = fmt.Sprintf("updated >= '%s' %s", 
moment.Format("2006/01/02 15:04"), jql)
+               jql = fmt.Sprintf("updated >= '%s' %s", 
since.Format("2006/01/02 15:04"), jql)
        }
        return jql
 }
diff --git a/backend/plugins/jira/tasks/issue_collector_test.go 
b/backend/plugins/jira/tasks/issue_collector_test.go
index c9891943d..99bf5a533 100644
--- a/backend/plugins/jira/tasks/issue_collector_test.go
+++ b/backend/plugins/jira/tasks/issue_collector_test.go
@@ -26,13 +26,10 @@ func Test_buildJQL(t *testing.T) {
        base := time.Date(2021, 2, 3, 4, 5, 6, 7, time.UTC)
        timeAfter := base
        add48 := base.Add(48 * time.Hour)
-       minus48 := base.Add(-48 * time.Hour)
        loc, _ := time.LoadLocation("Asia/Shanghai")
        type args struct {
-               timeAfter          *time.Time
-               latestSuccessStart *time.Time
-               isIncremental      bool
-               location           *time.Location
+               since    *time.Time
+               location *time.Location
        }
        tests := []struct {
                name string
@@ -42,56 +39,15 @@ func Test_buildJQL(t *testing.T) {
                {
                        name: "test incremental",
                        args: args{
-                               timeAfter:          nil,
-                               latestSuccessStart: nil,
-                               isIncremental:      false,
-                       },
-                       want: "ORDER BY created ASC"},
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          nil,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                               location:           loc,
-                       },
-                       want: "updated >= '2021/02/05 12:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &base,
-                               latestSuccessStart: nil,
-                               isIncremental:      false,
-                               location:           loc,
-                       },
-                       want: "updated >= '2021/02/03 12:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                       },
-                       want: "updated >= '2021/02/04 04:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                               location:           loc,
+                               since:    &add48,
+                               location: loc,
                        },
                        want: "updated >= '2021/02/05 12:05' ORDER BY created 
ASC",
                },
                {
                        name: "test incremental",
                        args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &minus48,
-                               isIncremental:      true,
+                               since: &timeAfter,
                        },
                        want: "updated >= '2021/02/02 04:05' ORDER BY created 
ASC",
                },
@@ -99,7 +55,7 @@ func Test_buildJQL(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       if got := buildJQL(tt.args.timeAfter, 
tt.args.latestSuccessStart, tt.args.isIncremental, tt.args.location); got != 
tt.want {
+                       if got := buildJQL(*tt.args.since, tt.args.location); 
got != tt.want {
                                t.Errorf("buildJQL() = %v, want %v", got, 
tt.want)
                        }
                })
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go 
b/backend/plugins/jira/tasks/issue_comment_collector.go
index fb0d2168c..513d82b6b 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,13 +71,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart))
 
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
                count, err := db.Count(clauses...)
@@ -102,7 +96,6 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                Input:         iterator,
                UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/comment",
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go 
b/backend/plugins/jira/tasks/remotelink_collector.go
index 158640d5c..7f0207f9d 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,13 +70,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart))
 
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -93,7 +87,6 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        if res.StatusCode == http.StatusNotFound {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index d0feab119..22e4bd4d9 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,20 +66,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
-       } else {
-               /*
-                       i.updated > max(rl.issue_updated) was deleted because 
for non-incremental collection,
-                       max(rl.issue_updated) will only be one of null, less or 
equal to i.updated
-                       so i.updated > max(rl.issue_updated) is always false.
-                       max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) > 
0 infers the issue has more than 100 worklogs,
-                       because we collected worklogs when collecting issues, 
and assign worklog.issue_updated if num of worklogs < 100,
-                       and max(c.issue_updated) IS NULL AND 
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned 
issue_updated
-               */
-               clauses = append(clauses, dal.Having("max(wl.issue_updated) IS 
NULL AND COUNT(wl.worklog_id) > 0"))
-       }
+       clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > 
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
 
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
@@ -96,7 +83,6 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                ApiClient:     data.ApiClient,
                UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
                PageSize:      50,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go 
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 264e45342..7e88046cd 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bug_changes",
@@ -51,12 +49,7 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created desc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_collector.go 
b/backend/plugins/tapd/tasks/bug_collector.go
index b5c85f68a..e4cf1e8d9 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -38,9 +38,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bugs",
@@ -51,12 +50,7 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go 
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 97a1c2937..e525721c4 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -44,18 +44,14 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
+
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_bugs.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdBug{}),
                dal.Where("_tool_tapd_bugs.connection_id = ? and 
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -67,7 +63,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go 
b/backend/plugins/tapd/tasks/iteration_collector.go
index 3a6ebc182..9e7d18ea1 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -40,9 +40,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                Concurrency: 3,
@@ -53,12 +52,7 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go 
b/backend/plugins/tapd/tasks/story_bug_collector.go
index b9ac18dfa..87b3a8989 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -42,18 +42,14 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyBugs")
-       incremental := collectorWithState.IsIncremental()
+
        clauses := []dal.Clause{
                dal.Select("id as issue_id, modified as update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -64,7 +60,6 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "stories/get_related_bugs",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go 
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index 7aa863417..1e07c4b3c 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       incremental := collectorWithState.IsIncremental()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "story_changes",
@@ -51,12 +49,7 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_collector.go 
b/backend/plugins/tapd/tasks/story_collector.go
index 211c7f268..06715b0b7 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -38,9 +38,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "stories",
@@ -51,12 +50,7 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go 
b/backend/plugins/tapd/tasks/story_commit_collector.go
index b69c6fa96..2f7e73bf9 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -44,18 +44,13 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_stories.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -66,7 +61,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go 
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index e4efec684..9824c8b43 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -38,9 +38,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect taskChangelogs")
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "task_changes",
@@ -50,12 +48,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_collector.go 
b/backend/plugins/tapd/tasks/task_collector.go
index 3511fff11..76c12bb5c 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -38,10 +38,9 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
                ApiClient:          data.ApiClient,
                PageSize:           int(data.Options.PageSize),
                UrlTemplate:        "tasks",
@@ -52,12 +51,7 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go 
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 88da97d6e..f3188ebb6 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -44,18 +44,13 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_tasks.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdTask{}),
                dal.Where("_tool_tapd_tasks.connection_id = ? and 
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       }
+       clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -66,7 +61,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go 
b/backend/plugins/tapd/tasks/worklog_collector.go
index 935f89758..e6be9d2d6 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -38,9 +38,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "timesheets",
@@ -50,12 +49,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
+                       query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        return query, nil
                },
                ResponseParser: GetRawMessageArrayFromResponse,
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go 
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 526ffc8a4..a4b58befe 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,14 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -95,7 +89,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "bugs/{{ .Input.BugId }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go 
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 117fdc7d9..33d07d13a 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,14 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`_tool_zentao_project_stories.project_id = ? and
                        _tool_zentao_project_stories.connection_id = ?`, 
data.Options.ProjectId, data.Options.ConnectionId),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -93,7 +87,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "stories/{{ .Input.ID }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go 
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 4d4c54feb..8defc3ff0 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,14 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
-       }
+       clauses = append(clauses, dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.Since))
+
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
@@ -91,7 +85,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "tasks/{{ .Input.ID }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {

Reply via email to