This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b8e9687f refactor: use since instead of 
timeafter,incremental,latestSuccessStart (#6094)
3b8e9687f is described below

commit 3b8e9687f7221729146759f4dd06affe2da29fa4
Author: abeizn <[email protected]>
AuthorDate: Tue Sep 19 11:14:09 2023 +0800

    refactor: use since instead of timeafter,incremental,latestSuccessStart 
(#6094)
    
    * refactor: use since instead of timeafter,incremental,latestSuccessStart
    
    * fix: unify processing and simplify logic
    
    * fix: unify processing and simplify logic
    
    * fix: lint
    
    * fix: when since is nil and add collector IsIncreamtal condition
    
    * fix: since logic
    
    * fix: remove LatestState
    
    * fix: remove some isIncremental
    
    ---------
    
    Co-authored-by: Klesh Wong <[email protected]>
---
 .../pluginhelper/api/api_collector_with_state.go   | 94 ++++++++++++----------
 backend/plugins/bitbucket/tasks/api_common.go      | 23 +++---
 backend/plugins/bitbucket/tasks/issue_collector.go |  1 -
 .../bitbucket/tasks/issue_comment_collector.go     |  1 -
 .../plugins/bitbucket/tasks/pipeline_collector.go  |  1 -
 .../bitbucket/tasks/pipeline_steps_collector.go    |  1 -
 backend/plugins/bitbucket/tasks/pr_collector.go    |  1 -
 .../bitbucket/tasks/pr_comment_collector.go        |  1 -
 .../plugins/bitbucket/tasks/pr_commit_collector.go |  1 -
 backend/plugins/github/tasks/cicd_job_collector.go | 10 +--
 backend/plugins/github/tasks/comment_collector.go  | 12 +--
 backend/plugins/github/tasks/commit_collector.go   | 13 +--
 backend/plugins/github/tasks/issue_collector.go    | 13 +--
 .../plugins/github/tasks/pr_commit_collector.go    | 15 ++--
 .../plugins/github/tasks/pr_review_collector.go    | 13 ++-
 .../github/tasks/pr_review_comment_collector.go    | 15 +---
 .../github_graphql/tasks/issue_collector.go        |  8 +-
 .../plugins/github_graphql/tasks/job_collector.go  |  8 +-
 .../plugins/github_graphql/tasks/pr_collector.go   |  4 +-
 backend/plugins/gitlab/tasks/issue_collector.go    | 13 +--
 backend/plugins/gitlab/tasks/mr_collector.go       |  9 +--
 .../plugins/gitlab/tasks/mr_detail_collector.go    |  7 +-
 backend/plugins/gitlab/tasks/mr_note_collector.go  |  3 -
 backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +--
 .../gitlab/tasks/pipeline_detail_collector.go      |  7 +-
 backend/plugins/gitlab/tasks/shared.go             |  4 +-
 backend/plugins/jenkins/tasks/stage_collector.go   |  7 +-
 .../jira/tasks/development_panel_collector.go      | 14 +---
 .../jira/tasks/issue_changelog_collector.go        |  9 +--
 backend/plugins/jira/tasks/issue_collector.go      | 29 +++----
 backend/plugins/jira/tasks/issue_collector_test.go | 56 ++-----------
 .../plugins/jira/tasks/issue_comment_collector.go  | 10 +--
 backend/plugins/jira/tasks/remotelink_collector.go | 10 +--
 backend/plugins/jira/tasks/worklog_collector.go    | 17 ++--
 .../plugins/tapd/tasks/bug_changelog_collector.go  |  9 +--
 backend/plugins/tapd/tasks/bug_collector.go        | 10 +--
 backend/plugins/tapd/tasks/bug_commit_collector.go | 10 +--
 backend/plugins/tapd/tasks/iteration_collector.go  | 10 +--
 backend/plugins/tapd/tasks/story_bug_collector.go  | 10 +--
 .../tapd/tasks/story_changelog_collector.go        |  9 +--
 backend/plugins/tapd/tasks/story_collector.go      | 10 +--
 .../plugins/tapd/tasks/story_commit_collector.go   |  9 +--
 .../plugins/tapd/tasks/task_changelog_collector.go |  9 +--
 backend/plugins/tapd/tasks/task_collector.go       | 10 +--
 .../plugins/tapd/tasks/task_commit_collector.go    |  9 +--
 backend/plugins/tapd/tasks/worklog_collector.go    | 10 +--
 .../plugins/zentao/tasks/bug_commits_collector.go  | 10 +--
 .../zentao/tasks/story_commits_collector.go        | 10 +--
 .../plugins/zentao/tasks/task_commits_collector.go | 10 +--
 49 files changed, 184 insertions(+), 401 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 813a75f73..25471612b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -36,9 +36,9 @@ type ApiCollectorStateManager struct {
        // *ApiCollector
        // *GraphqlCollector
        subtasks     []plugin.SubTask
-       LatestState  models.CollectorLatestState
-       TimeAfter    *time.Time
-       ExecuteStart time.Time
+       newState     models.CollectorLatestState
+       IsIncreamtal bool
+       Since        *time.Time
 }
 
 // NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -49,11 +49,13 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
        if err != nil {
                return nil, errors.Default.Wrap(err, "Couldn't resolve raw 
subtask args")
        }
-       latestState := models.CollectorLatestState{}
-       err = db.First(&latestState, dal.Where(`raw_data_table = ? AND 
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+       // CollectorLatestState retrieves the latest collector state from the 
database
+       oldState := models.CollectorLatestState{}
+       err = db.First(&oldState, dal.Where(`raw_data_table = ? AND 
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
        if err != nil {
                if db.IsErrorNotFound(err) {
-                       latestState = models.CollectorLatestState{
+                       oldState = models.CollectorLatestState{
                                RawDataTable:  rawDataSubTask.table,
                                RawDataParams: rawDataSubTask.params,
                        }
@@ -61,42 +63,57 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
                        return nil, errors.Default.Wrap(err, "failed to load 
JiraLatestCollectorMeta")
                }
        }
-       var timeAfter *time.Time
+       // Extract timeAfter and latestSuccessStart from old state
+       oldTimeAfter := oldState.TimeAfter
+       oldLatestSuccessStart := oldState.LatestSuccessStart
+
+       // Calculate incremental and since based on syncPolicy and old state
        syncPolicy := args.Ctx.TaskContext().SyncPolicy()
-       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
-               timeAfter = syncPolicy.TimeAfter
+       var isIncreamtal bool
+       var since *time.Time
+
+       if syncPolicy == nil {
+               // 1. If no syncPolicy, incremental and since is 
oldState.LatestSuccessStart
+               isIncreamtal = true
+               since = oldLatestSuccessStart
+       } else if oldLatestSuccessStart == nil {
+               // 2. If no oldState.LatestSuccessStart, not incremental and 
since is syncPolicy.TimeAfter
+               isIncreamtal = false
+               since = syncPolicy.TimeAfter
+       } else if syncPolicy.FullSync {
+               // 3. If fullSync true, not incremental and since is 
syncPolicy.TimeAfter
+               isIncreamtal = false
+               since = syncPolicy.TimeAfter
+       } else if syncPolicy.TimeAfter != nil {
+               // 4. If syncPolicy.TimeAfter not nil
+               if oldTimeAfter != nil && 
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
+                       // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter 
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
+                       isIncreamtal = false
+                       since = syncPolicy.TimeAfter
+               } else {
+                       // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter 
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
+                       isIncreamtal = true
+                       since = oldLatestSuccessStart
+               }
        }
+
+       currentTime := time.Now()
+       oldState.LatestSuccessStart = &currentTime
+       oldState.TimeAfter = syncPolicy.TimeAfter
+
        return &ApiCollectorStateManager{
                RawDataSubTaskArgs: args,
-               LatestState:        latestState,
-               TimeAfter:          timeAfter,
-               ExecuteStart:       time.Now(),
+               newState:           oldState,
+               IsIncreamtal:       isIncreamtal,
+               Since:              since,
        }, nil
-}
 
-// IsIncremental indicates if the collector should operate in incremental mode
-func (m *ApiCollectorStateManager) IsIncremental() bool {
-       prevSyncTime := m.LatestState.LatestSuccessStart
-       prevTimeAfter := m.LatestState.TimeAfter
-       syncPolicy := m.Ctx.TaskContext().SyncPolicy()
-       if syncPolicy != nil && syncPolicy.FullSync {
-               return false
-       }
-
-       if prevSyncTime == nil {
-               return false
-       }
-       // if we cleared the timeAfter, or moved timeAfter back in time, we 
should do a full sync
-       currTimeAfter := syncPolicy.TimeAfter
-       if currTimeAfter != nil {
-               return prevTimeAfter == nil || 
!currTimeAfter.Before(*prevTimeAfter)
-       }
-       return prevTimeAfter == nil
 }
 
 // InitCollector init the embedded collector
 func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+       args.Incremental = m.IsIncreamtal
        apiCollector, err := NewApiCollector(args)
        if err != nil {
                return err
@@ -126,10 +143,7 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
        }
 
        db := m.Ctx.GetDal()
-       m.LatestState.LatestSuccessStart = &m.ExecuteStart
-       m.LatestState.TimeAfter = m.TimeAfter
-
-       return db.CreateOrUpdate(&m.LatestState)
+       return db.CreateOrUpdate(&m.newState)
 }
 
 // NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync 
support for
@@ -165,14 +179,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
                return nil, err
        }
 
-       var isIncremental = manager.IsIncremental()
-       var createdAfter *time.Time
-       if isIncremental {
-               createdAfter = manager.LatestState.LatestSuccessStart
-       } else {
-               createdAfter = manager.TimeAfter
-       }
-
+       createdAfter := manager.Since
+       isIncremental := manager.IsIncreamtal
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
diff --git a/backend/plugins/bitbucket/tasks/api_common.go 
b/backend/plugins/bitbucket/tasks/api_common.go
index 284df3df2..7e2e76775 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -101,14 +101,10 @@ func GetQueryCreatedAndUpdated(fields string, 
collectorWithState *api.ApiCollect
                }
                query.Set("fields", fields)
                query.Set("sort", "created_on")
-               if collectorWithState.IsIncremental() {
-                       latestSuccessStart := 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
-                       query.Set("q", fmt.Sprintf("updated_on>=%s", 
latestSuccessStart))
-               } else if collectorWithState.TimeAfter != nil {
-                       timeAfter := 
collectorWithState.TimeAfter.Format(time.RFC3339)
-                       query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
-               }
 
+               if collectorWithState.Since != nil {
+                       query.Set("q", fmt.Sprintf("updated_on>=%s", 
collectorWithState.Since.Format(time.RFC3339)))
+               }
                return query, nil
        }
 }
@@ -179,9 +175,10 @@ func GetPullRequestsIterator(taskCtx 
plugin.SubTaskContext, collectorWithState *
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
        }
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -202,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api.Ap
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
@@ -225,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental() {
-               clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *collectorWithState.Since))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go 
b/backend/plugins/bitbucket/tasks/issue_collector.go
index a5f2e4a41..3edec1bea 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -43,7 +43,6 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/issues",
                Query: GetQueryCreatedAndUpdated(
                        `values.type,values.id,values.links.self,`+
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go 
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8e0036c4a..cdc9c2e3d 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName }}/issues/{{ 
.Input.BitbucketId }}/comments",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index e25861936..d11a7f243 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -43,7 +43,6 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    50,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/",
                Query: GetQueryCreatedAndUpdated(
                        
`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index 99cc1e39b..8c9d70aa7 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -52,7 +52,6 @@ func CollectPipelineSteps(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/{{ 
.Input.BitbucketId }}/steps/",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go 
b/backend/plugins/bitbucket/tasks/pr_collector.go
index 7379cc1e4..7aa9641e9 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -46,7 +46,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    50,
-               Incremental: collectorWithState.IsIncremental(),
                UrlTemplate: "repositories/{{ .Params.FullName }}/pullrequests",
                Query: GetQueryCreatedAndUpdated(
                        
`values.id,values.comment_count,values.type,values.state,values.title,values.description,`+
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go 
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index c4d539f85..ee7ccd228 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -50,7 +50,6 @@ func CollectApiPullRequestsComments(taskCtx 
plugin.SubTaskContext) errors.Error
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: collectorWithState.IsIncremental(),
                Input:       iterator,
                UrlTemplate: "repositories/{{ .Params.FullName 
}}/pullrequests/{{ .Input.BitbucketId }}/comments",
                Query: GetQueryFields(
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go 
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 9d99da39c..8ed4a5a11 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -52,7 +52,6 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:             data.ApiClient,
                PageSize:              100,
-               Incremental:           collectorWithState.IsIncremental(),
                Input:                 iterator,
                UrlTemplate:           "repositories/{{ .Params.FullName 
}}/pullrequests/{{ .Input.BitbucketId }}/commits",
                GetNextPageCustomData: GetNextPageCustomData,
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index 73fa2ab0d..93617f779 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,13 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        data.Options.GithubId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("github_updated_at > ?", 
collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -102,7 +97,6 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error 
{
                ApiClient:   data.ApiClient,
                PageSize:    100,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ 
.Input.ID }}/jobs",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
diff --git a/backend/plugins/github/tasks/comment_collector.go 
b/backend/plugins/github/tasks/comment_collector.go
index 197a1f7a0..73dfad834 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -58,23 +58,15 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
-               Incremental: incremental,
-
                UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               // Note that `since` is for filtering records 
by the `updated` time
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       // if incremental == true, we overwrite it
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
+                       if collectorWithState.Since != nil {
+                               query.Set("since", 
collectorWithState.Since.String())
                        }
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
diff --git a/backend/plugins/github/tasks/commit_collector.go 
b/backend/plugins/github/tasks/commit_collector.go
index ca37c6227..5a9be44db 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -58,11 +58,9 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                /*
                        url may use arbitrary variables from different source 
in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -79,11 +77,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
+                       if collectorWithState.Since != nil {
+                               query.Set("since", 
collectorWithState.Since.String())
                        }
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/github/tasks/issue_collector.go 
b/backend/plugins/github/tasks/issue_collector.go
index c27eaea86..67d2ce506 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -58,11 +58,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                /*
                        url may use arbitrary variables from different source 
in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -79,11 +77,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
+                       if collectorWithState.Since != nil {
+                               query.Set("since", 
collectorWithState.Since.String())
                        }
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go 
b/backend/plugins/github/tasks/pr_commit_collector.go
index 0a209ff2e..77c0e2a26 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -73,20 +73,18 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       // incremental collection, no need to care about the timeFilter since 
it has to be collected by PR
-       if incremental {
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
                clauses = append(
                        clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
+                       dal.Where("github_updated_at > ?", 
collectorWithState.Since),
                )
        }
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -98,10 +96,9 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
-               Input:       iterator,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
+               Input:     iterator,
 
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number 
}}/commits",
 
diff --git a/backend/plugins/github/tasks/pr_review_collector.go 
b/backend/plugins/github/tasks/pr_review_collector.go
index 18bbde6b3..6cc8d8ce9 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -65,18 +65,18 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if incremental {
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
                clauses = append(
                        clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
+                       dal.Where("github_updated_at > ?", 
collectorWithState.Since),
                )
        }
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -90,10 +90,9 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
        }
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
-               Input:       iterator,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
+               Input:     iterator,
 
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.Number 
}}/reviews",
 
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go 
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index eb3720ce0..e05ae2812 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -61,11 +61,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
                Header: func(reqData *helper.RequestData) (http.Header, 
errors.Error) {
                        // Adding -H "Accept: application/vnd.github+json" 
solve the issue of getting 502/403 error
                        header := http.Header{}
@@ -76,13 +74,8 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               // Note that `since` is for filtering records 
by the `updated` time
-                               query.Set("since", 
collectorWithState.TimeAfter.String())
-                       }
-                       // if incremental == true, we overwrite it
-                       if incremental {
-                               query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
+                       if collectorWithState.Since != nil {
+                               query.Set("since", 
collectorWithState.Since.String())
                        }
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7645db8d9..cfb0459e4 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -109,21 +109,17 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
-               Incremental:   incremental,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
                        query := &GraphqlQueryIssueWrapper{}
                        if reqData == nil {
                                return query, map[string]interface{}{}, nil
                        }
                        since := helper.DateTime{}
-                       if incremental {
-                               since = helper.DateTime{Time: 
*collectorWithState.LatestState.LatestSuccessStart}
-                       } else if collectorWithState.TimeAfter != nil {
-                               since = helper.DateTime{Time: 
*collectorWithState.TimeAfter}
+                       if collectorWithState.Since != nil {
+                               since = helper.DateTime{Time: 
*collectorWithState.Since}
                        }
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 095276c4a..41836d3a8 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -116,17 +116,16 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        clauses := []dal.Clause{
                dal.Select("check_suite_node_id"),
                dal.From(models.GithubRun{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
                dal.Orderby("github_updated_at DESC"),
        }
-       if incremental {
-               clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.Since))
        }
+
        cursor, err := db.Cursor(
                clauses...,
        )
@@ -142,7 +141,6 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                Input:         iterator,
                InputStep:     20,
-               Incremental:   incremental,
                GraphqlClient: data.GraphqlClient,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
                        query := &GraphqlQueryCheckRunWrapper{}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index afd8ff0e8..37d10e742 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -160,11 +160,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      10,
-               Incremental:   incremental,
                /*
                        (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
                */
@@ -194,7 +192,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
                        isFinish := false
                        for _, rawL := range prs {
                                // collect data even though in increment mode 
because of updating existing data
-                               if collectorWithState.TimeAfter != nil && 
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+                               if collectorWithState.Since != nil && 
!collectorWithState.Since.Before(rawL.UpdatedAt) {
                                        isFinish = true
                                        break
                                }
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go 
b/backend/plugins/gitlab/tasks/issue_collector.go
index 0829f9cde..81c78be37 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -51,11 +51,9 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  100,
 
                UrlTemplate: "projects/{{ .Params.ProjectId }}/issues",
                /*
@@ -63,11 +61,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                */
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+                       if collectorWithState.Since != nil {
+                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        }
                        query.Set("sort", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go 
b/backend/plugins/gitlab/tasks/mr_collector.go
index c55fe760e..ce0f21b3c 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -48,11 +48,9 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
-               Incremental:    incremental,
                UrlTemplate:    "projects/{{ .Params.ProjectId 
}}/merge_requests",
                GetTotalPages:  GetTotalPagesFromResponse,
                ResponseParser: GetRawMessageFromResponse,
@@ -61,11 +59,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        if err != nil {
                                return nil, err
                        }
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+                       if collectorWithState.Since != nil {
+                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go 
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 5b9af48dc..f60871a24 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -85,11 +85,10 @@ func GetMergeRequestDetailsIterator(taskCtx 
plugin.SubTaskContext, collectorWith
                        data.Options.ProjectId, data.Options.ConnectionId, true,
                ),
        }
-       if collectorWithState.LatestState.LatestSuccessStart != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       } else if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.TimeAfter))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
        }
+
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go 
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index d6fde5751..282135982 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -51,12 +51,9 @@ func CollectApiMergeRequestsNotes(taskCtx 
plugin.SubTaskContext) errors.Error {
        }
        defer iterator.Close()
 
-       incremental := collectorWithState.IsIncremental()
-
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
-               Incremental:    incremental,
                Input:          iterator,
                UrlTemplate:    "projects/{{ .Params.ProjectId 
}}/merge_requests/{{ .Input.Iid }}/notes?system=false",
                Query:          GetQuery,
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index c22cc872a..06a14c925 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -53,21 +53,17 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                MinTickInterval:    &tickInterval,
                PageSize:           100,
-               Incremental:        incremental,
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
-                       }
-                       if incremental {
-                               query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
+                       if collectorWithState.Since != nil {
+                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        }
                        query.Set("with_stats", "true")
                        query.Set("sort", "asc")
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index a37807b8c..aa4c80148 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -55,8 +55,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.IsIncremental()
-
        iterator, err := GetPipelinesIterator(taskCtx, collectorWithState)
        if err != nil {
                return err
@@ -68,7 +66,6 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) 
errors.Error {
                ApiClient:          data.ApiClient,
                MinTickInterval:    &tickInterval,
                Input:              iterator,
-               Incremental:        incremental,
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines/{{ .Input.GitlabId }}",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
@@ -96,8 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *hel
                        data.Options.ProjectId, data.Options.ConnectionId, true,
                ),
        }
-       if collectorWithState.LatestState.LatestSuccessStart != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/shared.go 
b/backend/plugins/gitlab/tasks/shared.go
index 06cc46225..7f1e3e8c1 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -159,8 +159,8 @@ func GetMergeRequestsIterator(taskCtx 
plugin.SubTaskContext, collectorWithState
                ),
        }
        if collectorWithState != nil {
-               if collectorWithState.LatestState.LatestSuccessStart != nil {
-                       clauses = append(clauses, dal.Where("gitlab_updated_at 
> ?", *collectorWithState.LatestState.LatestSuccessStart))
+               if collectorWithState.Since != nil {
+                       clauses = append(clauses, dal.Where("gitlab_updated_at 
> ?", *collectorWithState.Since))
                }
        }
        // construct the input iterator
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index bf159c1d6..47530c95e 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,12 +67,9 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.Since))
        }
-
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go 
b/backend/plugins/jira/tasks/development_panel_collector.go
index 278052cdc..64a7ea372 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,14 +72,9 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
-
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                logger.Error(err, "collect development panel error")
@@ -93,9 +88,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               Input:       iterator,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               Input:     iterator,
                // the URL looks like:
                // 
https://merico.atlassian.net/rest/dev-status/1.0/issue/detail?issueId=25184&applicationType=GitLab&dataType=repository
                UrlTemplate: "dev-status/1.0/issue/detail",
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go 
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 7b5fc7674..bcbf46dc0 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,12 +71,8 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
 
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
@@ -102,7 +98,6 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                Input:         iterator,
                UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/backend/plugins/jira/tasks/issue_collector.go 
b/backend/plugins/jira/tasks/issue_collector.go
index b00cd4ff3..3faaa9d51 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -70,19 +70,20 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
        // build jql
        // IMPORTANT: we have to keep paginated data in a consistence order to 
avoid data-missing, if we sort issues by
        //  `updated`, issue will be jumping between pages if it got updated 
during the collection process
-       incremental := collectorWithState.IsIncremental()
        loc, err := getTimeZone(taskCtx)
        if err != nil {
                logger.Info("failed to get timezone, err: %v", err)
        } else {
                logger.Info("got user's timezone: %v", loc.String())
        }
-       jql := buildJQL(collectorWithState.TimeAfter, 
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+       jql := "ORDER BY created ASC"
+       if collectorWithState.Since != nil {
+               jql = buildJQL(*collectorWithState.Since, loc)
+       }
 
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    data.Options.PageSize,
-               Incremental: incremental,
+               ApiClient: data.ApiClient,
+               PageSize:  data.Options.PageSize,
                /*
                        url may use arbitrary variables from different 
connection in any order, we need GoTemplate to allow more
                        flexible for all kinds of possibility.
@@ -146,23 +147,15 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
 }
 
 // buildJQL build jql based on timeAfter and incremental mode
-func buildJQL(timeAfter, latestSuccessStart *time.Time, isIncremental bool, 
location *time.Location) string {
+func buildJQL(since time.Time, location *time.Location) string {
        jql := "ORDER BY created ASC"
-       var moment time.Time
-       if timeAfter != nil {
-               moment = *timeAfter
-       }
-       // if isIncremental is true, we should not collect data before 
latestSuccessStart
-       if isIncremental && latestSuccessStart.After(moment) {
-               moment = *latestSuccessStart
-       }
-       if !moment.IsZero() {
+       if !since.IsZero() {
                if location != nil {
-                       moment = moment.In(location)
+                       since = since.In(location)
                } else {
-                       moment = moment.In(time.UTC).Add(-24 * time.Hour)
+                       since = since.In(time.UTC).Add(-24 * time.Hour)
                }
-               jql = fmt.Sprintf("updated >= '%s' %s", 
moment.Format("2006/01/02 15:04"), jql)
+               jql = fmt.Sprintf("updated >= '%s' %s", 
since.Format("2006/01/02 15:04"), jql)
        }
        return jql
 }
diff --git a/backend/plugins/jira/tasks/issue_collector_test.go 
b/backend/plugins/jira/tasks/issue_collector_test.go
index c9891943d..99bf5a533 100644
--- a/backend/plugins/jira/tasks/issue_collector_test.go
+++ b/backend/plugins/jira/tasks/issue_collector_test.go
@@ -26,13 +26,10 @@ func Test_buildJQL(t *testing.T) {
        base := time.Date(2021, 2, 3, 4, 5, 6, 7, time.UTC)
        timeAfter := base
        add48 := base.Add(48 * time.Hour)
-       minus48 := base.Add(-48 * time.Hour)
        loc, _ := time.LoadLocation("Asia/Shanghai")
        type args struct {
-               timeAfter          *time.Time
-               latestSuccessStart *time.Time
-               isIncremental      bool
-               location           *time.Location
+               since    *time.Time
+               location *time.Location
        }
        tests := []struct {
                name string
@@ -42,56 +39,15 @@ func Test_buildJQL(t *testing.T) {
                {
                        name: "test incremental",
                        args: args{
-                               timeAfter:          nil,
-                               latestSuccessStart: nil,
-                               isIncremental:      false,
-                       },
-                       want: "ORDER BY created ASC"},
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          nil,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                               location:           loc,
-                       },
-                       want: "updated >= '2021/02/05 12:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &base,
-                               latestSuccessStart: nil,
-                               isIncremental:      false,
-                               location:           loc,
-                       },
-                       want: "updated >= '2021/02/03 12:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                       },
-                       want: "updated >= '2021/02/04 04:05' ORDER BY created 
ASC",
-               },
-               {
-                       name: "test incremental",
-                       args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &add48,
-                               isIncremental:      true,
-                               location:           loc,
+                               since:    &add48,
+                               location: loc,
                        },
                        want: "updated >= '2021/02/05 12:05' ORDER BY created 
ASC",
                },
                {
                        name: "test incremental",
                        args: args{
-                               timeAfter:          &timeAfter,
-                               latestSuccessStart: &minus48,
-                               isIncremental:      true,
+                               since: &timeAfter,
                        },
                        want: "updated >= '2021/02/02 04:05' ORDER BY created 
ASC",
                },
@@ -99,7 +55,7 @@ func Test_buildJQL(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       if got := buildJQL(tt.args.timeAfter, 
tt.args.latestSuccessStart, tt.args.isIncremental, tt.args.location); got != 
tt.want {
+                       if got := buildJQL(*tt.args.since, tt.args.location); 
got != tt.want {
                                t.Errorf("buildJQL() = %v, want %v", got, 
tt.want)
                        }
                })
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go 
b/backend/plugins/jira/tasks/issue_comment_collector.go
index fb0d2168c..71c91491e 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,14 +71,9 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
-
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
                count, err := db.Count(clauses...)
                if err != nil {
@@ -102,7 +97,6 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                Input:         iterator,
                UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/comment",
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go 
b/backend/plugins/jira/tasks/remotelink_collector.go
index 158640d5c..450723ec2 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,14 +70,9 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
-               clauses = append(
-                       clauses,
-                       dal.Where("i.updated > ?", 
collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
-
        cursor, err := db.Cursor(clauses...)
        if err != nil {
                logger.Error(err, "collect remotelink error")
@@ -93,7 +88,6 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        if res.StatusCode == http.StatusNotFound {
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index d0feab119..71121597f 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,21 +66,19 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
        } else {
                /*
-                       i.updated > max(rl.issue_updated) was deleted because 
for non-incremental collection,
-                       max(rl.issue_updated) will only be one of null, less or 
equal to i.updated
-                       so i.updated > max(rl.issue_updated) is always false.
-                       max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) > 
0 infers the issue has more than 100 worklogs,
+                       i.updated > max(wl.issue_updated) was deleted because 
for non-incremental collection,
+                       max(wl.issue_updated) will only be one of null, less or 
equal to i.updated
+                       so i.updated > max(wl.issue_updated) is always false.
+                       max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) 
> 0 infers the issue has more than 100 worklogs,
                        because we collected worklogs when collecting issues, 
and assign worklog.issue_updated if num of worklogs < 100,
-                       and max(c.issue_updated) IS NULL AND 
COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned 
issue_updated
+                       and max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0 means all worklogs for the issue were not assigned 
issue_updated
                */
                clauses = append(clauses, dal.Having("max(wl.issue_updated) IS 
NULL AND COUNT(wl.worklog_id) > 0"))
        }
-
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -96,7 +94,6 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                ApiClient:     data.ApiClient,
                UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
                PageSize:      50,
-               Incremental:   incremental,
                GetTotalPages: GetTotalPagesFromResponse,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go 
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 264e45342..bf768e0d1 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bug_changes",
@@ -51,11 +49,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created desc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/bug_collector.go 
b/backend/plugins/tapd/tasks/bug_collector.go
index b5c85f68a..746c1bd89 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -38,9 +38,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bugs",
@@ -51,11 +50,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go 
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 97a1c2937..3164adb05 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -44,17 +44,14 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
+
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_bugs.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdBug{}),
                dal.Where("_tool_tapd_bugs.connection_id = ? and 
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -67,7 +64,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go 
b/backend/plugins/tapd/tasks/iteration_collector.go
index 3a6ebc182..5f5512b1f 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -40,9 +40,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                Concurrency: 3,
@@ -53,11 +52,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go 
b/backend/plugins/tapd/tasks/story_bug_collector.go
index b9ac18dfa..9d5d0ed0c 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -42,17 +42,14 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyBugs")
-       incremental := collectorWithState.IsIncremental()
+
        clauses := []dal.Clause{
                dal.Select("id as issue_id, modified as update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -64,7 +61,6 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "stories/get_related_bugs",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go 
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index 7aa863417..d0597b7ef 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -38,10 +38,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       incremental := collectorWithState.IsIncremental()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "story_changes",
@@ -51,11 +49,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/story_collector.go 
b/backend/plugins/tapd/tasks/story_collector.go
index 211c7f268..095c72b3d 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -38,9 +38,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "stories",
@@ -51,11 +50,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go 
b/backend/plugins/tapd/tasks/story_commit_collector.go
index b69c6fa96..708c71010 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -44,17 +44,13 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_stories.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -66,7 +62,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go 
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index e4efec684..7aba00c2a 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -38,9 +38,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect taskChangelogs")
-       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "task_changes",
@@ -50,11 +48,8 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/task_collector.go 
b/backend/plugins/tapd/tasks/task_collector.go
index 3511fff11..2d5bfdba3 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -38,10 +38,9 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
                ApiClient:          data.ApiClient,
                PageSize:           int(data.Options.PageSize),
                UrlTemplate:        "tasks",
@@ -52,11 +51,8 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go 
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 88da97d6e..0ef938997 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -44,17 +44,13 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_tasks.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdTask{}),
                dal.Where("_tool_tapd_tasks.connection_id = ? and 
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
-       }
-       if incremental {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
+       if collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -66,7 +62,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
-               Incremental: incremental,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go 
b/backend/plugins/tapd/tasks/worklog_collector.go
index 935f89758..b091f9e44 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -38,9 +38,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental()
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               Incremental: incremental,
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "timesheets",
@@ -50,11 +49,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
-                       }
-                       if incremental {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.Since != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go 
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 526ffc8a4..cb55ceda6 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,13 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -95,7 +90,6 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "bugs/{{ .Input.BugId }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go 
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 117fdc7d9..a536f52f1 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,13 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`_tool_zentao_project_stories.project_id = ? and
                        _tool_zentao_project_stories.connection_id = ?`, 
data.Options.ProjectId, data.Options.ConnectionId),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -93,7 +88,6 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "stories/{{ .Input.ID }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go 
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 4d4c54feb..665069508 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,13 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       // incremental collection
-       incremental := collectorWithState.IsIncremental()
-       if incremental {
-               clauses = append(
-                       clauses,
-                       dal.Where("last_edited_date is not null and 
last_edited_date > ?", collectorWithState.LatestState.LatestSuccessStart),
-               )
+       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -91,7 +86,6 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                ApiClient:   data.ApiClient,
                Input:       iterator,
-               Incremental: incremental,
                UrlTemplate: "tasks/{{ .Input.ID }}",
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var data struct {

Reply via email to