This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 40669df5a refactor: StatefulApiCollector adopts CollectorStateManager 
(#7324)
40669df5a is described below

commit 40669df5a46c144f2478876fa0da613b9167a0f7
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Apr 15 16:11:01 2024 +0800

    refactor: StatefulApiCollector adopts CollectorStateManager (#7324)
---
 ...tor_with_state.go => api_collector_stateful.go} | 155 ++++-----------------
 .../plugins/azuredevops_go/tasks/pr_collector.go   |  15 +-
 backend/plugins/bitbucket/tasks/api_common.go      |  24 ++--
 .../plugins/bitbucket_server/tasks/api_common.go   |   6 +-
 backend/plugins/github/tasks/cicd_job_collector.go |  10 +-
 backend/plugins/github/tasks/comment_collector.go  |  10 +-
 backend/plugins/github/tasks/commit_collector.go   |  10 +-
 backend/plugins/github/tasks/issue_collector.go    |  10 +-
 .../plugins/github/tasks/pr_commit_collector.go    |  10 +-
 .../plugins/github/tasks/pr_review_collector.go    |  10 +-
 .../github/tasks/pr_review_comment_collector.go    |  10 +-
 .../github_graphql/tasks/deployment_collector.go   |   8 +-
 .../github_graphql/tasks/issue_collector.go        |   8 +-
 .../plugins/github_graphql/tasks/job_collector.go  |  10 +-
 .../plugins/github_graphql/tasks/pr_collector.go   |   8 +-
 .../plugins/gitlab/tasks/deployment_collector.go   |  14 +-
 backend/plugins/gitlab/tasks/issue_collector.go    |   4 +-
 backend/plugins/gitlab/tasks/mr_collector.go       |  10 +-
 .../plugins/gitlab/tasks/mr_detail_collector.go    |   6 +-
 backend/plugins/gitlab/tasks/pipeline_collector.go |  10 +-
 .../gitlab/tasks/pipeline_detail_collector.go      |   6 +-
 backend/plugins/gitlab/tasks/shared.go             |   8 +-
 .../plugins/gitlab/tasks/trigger_job_collector.go  |  16 +--
 backend/plugins/jenkins/tasks/stage_collector.go   |  10 +-
 .../jira/tasks/development_panel_collector.go      |  10 +-
 backend/plugins/jira/tasks/epic_collector.go       |  10 +-
 .../jira/tasks/issue_changelog_collector.go        |  10 +-
 backend/plugins/jira/tasks/issue_collector.go      |  10 +-
 .../plugins/jira/tasks/issue_comment_collector.go  |  10 +-
 backend/plugins/jira/tasks/remotelink_collector.go |  10 +-
 backend/plugins/jira/tasks/worklog_collector.go    |  16 ++-
 .../plugins/tapd/tasks/bug_changelog_collector.go  |  10 +-
 backend/plugins/tapd/tasks/bug_collector.go        |  10 +-
 backend/plugins/tapd/tasks/bug_commit_collector.go |  10 +-
 backend/plugins/tapd/tasks/iteration_collector.go  |  10 +-
 backend/plugins/tapd/tasks/story_bug_collector.go  |  10 +-
 .../tapd/tasks/story_changelog_collector.go        |   4 +-
 backend/plugins/tapd/tasks/story_collector.go      |  10 +-
 .../plugins/tapd/tasks/story_commit_collector.go   |  10 +-
 .../plugins/tapd/tasks/task_changelog_collector.go |  10 +-
 backend/plugins/tapd/tasks/task_collector.go       |   6 +-
 .../plugins/tapd/tasks/task_commit_collector.go    |  10 +-
 backend/plugins/tapd/tasks/worklog_collector.go    |  10 +-
 .../plugins/zentao/tasks/bug_commits_collector.go  |  10 +-
 .../zentao/tasks/story_commits_collector.go        |  10 +-
 .../plugins/zentao/tasks/task_commits_collector.go |  10 +-
 46 files changed, 255 insertions(+), 349 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_stateful.go
similarity index 67%
rename from backend/helpers/pluginhelper/api/api_collector_with_state.go
rename to backend/helpers/pluginhelper/api/api_collector_stateful.go
index 513c442cb..2c1086bbc 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go
@@ -21,172 +21,74 @@ import (
        "encoding/json"
        "net/http"
        "net/url"
-       "reflect"
        "time"
 
-       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/plugin"
 )
 
-// ApiCollectorStateManager save collector state in framework table
-type ApiCollectorStateManager struct {
+// StatefulApiCollector runs multiple collectors as a single subtask and 
maintains the state of the collector
+// mainly the time range to collect across multiple collections. It is useful 
when you need to support timeAfter
+// and diff sync for APIs that do not support filtering by the updated date.
+type StatefulApiCollector struct {
        RawDataSubTaskArgs
+       CollectorStateManager
        // *ApiCollector
        // *GraphqlCollector
-       subtasks      []plugin.SubTask
-       newState      models.CollectorLatestState
-       IsIncremental bool
-       Since         *time.Time
-       Before        *time.Time
+       nestedCollectors []plugin.SubTask
 }
 
-type CollectorOptions struct {
-       TimeAfter string `json:"timeAfter,omitempty" 
mapstructure:"timeAfter,omitempty"`
-}
-
-// NewStatefulApiCollector create a new ApiCollectorStateManager
-func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager, errors.Error) {
-       db := args.Ctx.GetDal()
+// NewStatefulApiCollector create a new StatefulApiCollector
+func NewStatefulApiCollector(args RawDataSubTaskArgs) (*StatefulApiCollector, 
errors.Error) {
        syncPolicy := args.Ctx.TaskContext().SyncPolicy()
        rawDataSubTask, err := NewRawDataSubTask(args)
        if err != nil {
-               return nil, errors.Default.Wrap(err, "Couldn't resolve raw 
subtask args")
-       }
-
-       // get optionTimeAfter from options
-       data := args.Ctx.GetData()
-       value := reflect.ValueOf(data)
-       if value.Kind() == reflect.Ptr && value.Elem().Kind() == reflect.Struct 
{
-               options := value.Elem().FieldByName("Options")
-               if options.IsValid() && options.Kind() == reflect.Ptr && 
options.Elem().Kind() == reflect.Struct {
-                       collectorOptions := 
options.Elem().FieldByName("CollectorOptions")
-                       if collectorOptions.IsValid() && 
collectorOptions.Kind() == reflect.Struct {
-                               timeAfter := 
collectorOptions.FieldByName("TimeAfter")
-                               if timeAfter.IsValid() && timeAfter.Kind() == 
reflect.String && timeAfter.String() != "" {
-                                       optionTimeAfter, parseErr := 
time.Parse(time.RFC3339, timeAfter.String())
-                                       if parseErr != nil {
-                                               return nil, 
errors.Default.Wrap(parseErr, "Failed to parse timeAfter!")
-                                       }
-                                       if syncPolicy != nil {
-                                               syncPolicy.TimeAfter = 
&optionTimeAfter
-                                       } else {
-                                               syncPolicy = &models.SyncPolicy{
-                                                       TimeAfter: 
&optionTimeAfter,
-                                               }
-                                       }
-                               }
-                       }
-               }
+               return nil, err
        }
-
-       // CollectorLatestState retrieves the latest collector state from the 
database
-       oldState := models.CollectorLatestState{}
-       err = db.First(&oldState, dal.Where(`raw_data_table = ? AND 
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+       stateManager, err := NewCollectorStateManager(args.Ctx, syncPolicy, 
rawDataSubTask.table, rawDataSubTask.params)
        if err != nil {
-               if db.IsErrorNotFound(err) {
-                       oldState = models.CollectorLatestState{
-                               RawDataTable:  rawDataSubTask.table,
-                               RawDataParams: rawDataSubTask.params,
-                       }
-               } else {
-                       return nil, errors.Default.Wrap(err, "failed to load 
JiraLatestCollectorMeta")
-               }
-       }
-       // Extract timeAfter and latestSuccessStart from old state
-       oldTimeAfter := oldState.TimeAfter
-       oldLatestSuccessStart := oldState.LatestSuccessStart
-
-       // Calculate incremental and since based on syncPolicy and old state
-       var isIncremental bool
-       var since *time.Time
-
-       if oldLatestSuccessStart == nil {
-               // 1. If no oldState.LatestSuccessStart, not incremental and 
since is syncPolicy.TimeAfter
-               isIncremental = false
-               if syncPolicy != nil {
-                       since = syncPolicy.TimeAfter
-               }
-       } else if syncPolicy == nil {
-               // 2. If no syncPolicy, incremental and since is 
oldState.LatestSuccessStart
-               isIncremental = true
-               since = oldLatestSuccessStart
-       } else if syncPolicy.FullSync {
-               // 3. If fullSync true, not incremental and since is 
syncPolicy.TimeAfter
-               isIncremental = false
-               since = syncPolicy.TimeAfter
-       } else if syncPolicy.TimeAfter == nil {
-               // 4. If no syncPolicy TimeAfter, incremental and since is 
oldState.LatestSuccessStart
-               isIncremental = true
-               since = oldLatestSuccessStart
-       } else {
-               // 5. If syncPolicy.TimeAfter not nil
-               if oldTimeAfter != nil && 
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
-                       // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter 
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
-                       isIncremental = false
-                       since = syncPolicy.TimeAfter
-               } else {
-                       // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter 
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
-                       isIncremental = true
-                       since = oldLatestSuccessStart
-               }
-       }
-
-       currentTime := time.Now()
-       oldState.LatestSuccessStart = &currentTime
-       if syncPolicy != nil {
-               oldState.TimeAfter = syncPolicy.TimeAfter
-               if syncPolicy.TimeAfter != nil && oldTimeAfter != nil && 
(oldTimeAfter).Before(*syncPolicy.TimeAfter) && !syncPolicy.FullSync {
-                       oldState.TimeAfter = oldTimeAfter
-               }
+               return nil, err
        }
-
-       return &ApiCollectorStateManager{
-               RawDataSubTaskArgs: args,
-               newState:           oldState,
-               IsIncremental:      isIncremental,
-               Since:              since,
-               Before:             &currentTime,
+       return &StatefulApiCollector{
+               RawDataSubTaskArgs:    args,
+               CollectorStateManager: *stateManager,
        }, nil
-
 }
 
-// InitCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) 
errors.Error {
+// InitCollector appends a new collector to the list
+func (m *StatefulApiCollector) InitCollector(args ApiCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-       args.Incremental = args.Incremental || m.IsIncremental
+       args.Incremental = m.CollectorStateManager.IsIncremental()
        apiCollector, err := NewApiCollector(args)
        if err != nil {
                return err
        }
-       m.subtasks = append(m.subtasks, apiCollector)
+       m.nestedCollectors = append(m.nestedCollectors, apiCollector)
        return nil
 }
 
-// InitGraphQLCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitGraphQLCollector(args 
GraphqlCollectorArgs) errors.Error {
+// InitGraphQLCollector appends a new GraphQL collector to the list
+func (m *StatefulApiCollector) InitGraphQLCollector(args GraphqlCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-       args.Incremental = args.Incremental || m.IsIncremental
+       args.Incremental = m.CollectorStateManager.IsIncremental()
        graphqlCollector, err := NewGraphqlCollector(args)
        if err != nil {
                return err
        }
-       m.subtasks = append(m.subtasks, graphqlCollector)
+       m.nestedCollectors = append(m.nestedCollectors, graphqlCollector)
        return nil
 }
 
-// Execute the embedded collector and record execute state
-func (m *ApiCollectorStateManager) Execute() errors.Error {
-       for _, subtask := range m.subtasks {
+// Execute all nested collectors and save the state if all collectors succeed
+func (m *StatefulApiCollector) Execute() errors.Error {
+       for _, subtask := range m.nestedCollectors {
                err := subtask.Execute()
                if err != nil {
                        return err
                }
        }
 
-       db := m.Ctx.GetDal()
-       return db.CreateOrUpdate(&m.newState)
+       return m.CollectorStateManager.Close()
 }
 
 // NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync 
support for
@@ -222,8 +124,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
                return nil, err
        }
 
-       createdAfter := manager.Since
-       isIncremental := manager.IsIncremental
+       createdAfter := manager.CollectorStateManager.GetSince()
+       isIncremental := manager.CollectorStateManager.IsIncremental()
 
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
@@ -329,7 +231,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
 type FinalizableApiCollectorArgs struct {
        RawDataSubTaskArgs
        ApiClient                RateLimitedApiClient
-       TimeAfter                *time.Time // leave it be nil to disable time 
filter
        CollectNewRecordsByList  FinalizableApiCollectorListArgs
        CollectUnfinishedDetails *FinalizableApiCollectorDetailArgs
 }
diff --git a/backend/plugins/azuredevops_go/tasks/pr_collector.go 
b/backend/plugins/azuredevops_go/tasks/pr_collector.go
index 7933b3d46..30a63dba7 100644
--- a/backend/plugins/azuredevops_go/tasks/pr_collector.go
+++ b/backend/plugins/azuredevops_go/tasks/pr_collector.go
@@ -19,11 +19,12 @@ package tasks
 
 import (
        "fmt"
+       "net/url"
+       "time"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "net/url"
-       "time"
 )
 
 func init() {
@@ -51,12 +52,12 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                Options: data.Options,
        }
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                PageSize:           100,
@@ -67,9 +68,9 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("$skip", fmt.Sprint(reqData.Pager.Skip))
                        query.Set("$top", fmt.Sprint(reqData.Pager.Size))
 
-                       if collectorWithState.Since != nil {
+                       if apiCollector.GetSince() != nil {
                                query.Set("searchCriteria.queryTimeRangeType", 
"created")
-                               query.Set("searchCriteria.minTime", 
collectorWithState.Since.Format(time.RFC3339))
+                               query.Set("searchCriteria.minTime", 
apiCollector.GetSince().Format(time.RFC3339))
                        }
                        return query, nil
                },
@@ -81,5 +82,5 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/bitbucket/tasks/api_common.go 
b/backend/plugins/bitbucket/tasks/api_common.go
index 8990fdbaf..352f41cb9 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -93,7 +93,7 @@ func GetQuery(reqData *api.RequestData) (url.Values, 
errors.Error) {
 }
 
 // GetQueryCreatedAndUpdated is a common GeyQuery for timeFilter and 
incremental
-func GetQueryCreatedAndUpdated(fields string, collectorWithState 
*api.ApiCollectorStateManager) func(reqData *api.RequestData) (url.Values, 
errors.Error) {
+func GetQueryCreatedAndUpdated(fields string, apiCollector 
*api.StatefulApiCollector) func(reqData *api.RequestData) (url.Values, 
errors.Error) {
        return func(reqData *api.RequestData) (url.Values, errors.Error) {
                query, err := GetQuery(reqData)
                if err != nil {
@@ -102,8 +102,8 @@ func GetQueryCreatedAndUpdated(fields string, 
collectorWithState *api.ApiCollect
                query.Set("fields", fields)
                query.Set("sort", "created_on")
 
-               if collectorWithState.Since != nil {
-                       query.Set("q", fmt.Sprintf("updated_on>=%s", 
collectorWithState.Since.Format(time.RFC3339)))
+               if apiCollector.GetSince() != nil {
+                       query.Set("q", fmt.Sprintf("updated_on>=%s", 
apiCollector.GetSince().Format(time.RFC3339)))
                }
                return query, nil
        }
@@ -164,7 +164,7 @@ func GetRawMessageFromResponse(res *http.Response) 
([]json.RawMessage, errors.Er
        return rawMessages.Values, nil
 }
 
-func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, sac 
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*BitbucketTaskData)
        clauses := []dal.Clause{
@@ -175,8 +175,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
+       if sac.IsIncremental() && sac.GetSince() != nil {
+               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*sac.GetSince()))
        }
 
        // construct the input iterator
@@ -188,7 +188,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
        return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(BitbucketInput{}))
 }
 
-func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetIssuesIterator(taskCtx plugin.SubTaskContext, sac 
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*BitbucketTaskData)
        clauses := []dal.Clause{
@@ -199,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api.Ap
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
+       if sac.IsIncremental() && sac.GetSince() != nil {
+               clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*sac.GetSince()))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
@@ -211,7 +211,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api.Ap
        return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(BitbucketInput{}))
 }
 
-func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetPipelinesIterator(taskCtx plugin.SubTaskContext, sac 
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*BitbucketTaskData)
        clauses := []dal.Clause{
@@ -222,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *collectorWithState.Since))
+       if sac.IsIncremental() && sac.GetSince() != nil {
+               clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *sac.GetSince()))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/bitbucket_server/tasks/api_common.go 
b/backend/plugins/bitbucket_server/tasks/api_common.go
index 55e99ce5b..0b2ad1f68 100644
--- a/backend/plugins/bitbucket_server/tasks/api_common.go
+++ b/backend/plugins/bitbucket_server/tasks/api_common.go
@@ -119,7 +119,7 @@ func GetRawMessageFromResponse(res *http.Response) 
([]json.RawMessage, errors.Er
        return rawMessages.Values, nil
 }
 
-func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector 
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*BitbucketServerTaskData)
        clauses := []dal.Clause{
@@ -131,8 +131,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
                ),
        }
 
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, 
dal.Where("bpr.bitbucket_server_updated_at > ?", *collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, 
dal.Where("bpr.bitbucket_server_updated_at > ?", *apiCollector.GetSince()))
        }
 
        // construct the input iterator
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index dda4b90c0..60780339f 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -52,7 +52,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
 
        // state manager
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -73,8 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
                        data.Options.GithubId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("github_updated_at > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("github_updated_at > ?", 
apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -85,7 +85,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
                return err
        }
        // collect jobs
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx: taskCtx,
                        Params: GithubApiParams{
@@ -118,7 +118,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 type SimpleGithubRun struct {
diff --git a/backend/plugins/github/tasks/comment_collector.go 
b/backend/plugins/github/tasks/comment_collector.go
index e06f023f4..9f40e7736 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -46,7 +46,7 @@ var CollectApiCommentsMeta = plugin.SubTaskMeta{
 
 func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -58,15 +58,15 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
                UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.Since != nil {
-                               query.Set("since", 
collectorWithState.Since.String())
+                       if apiCollector.GetSince() != nil {
+                               query.Set("since", 
apiCollector.GetSince().String())
                        }
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
@@ -89,5 +89,5 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return errors.Default.Wrap(err, "error collecting github 
comments")
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github/tasks/commit_collector.go 
b/backend/plugins/github/tasks/commit_collector.go
index b735a508d..019060c17 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -46,7 +46,7 @@ var CollectApiCommitsMeta = plugin.SubTaskMeta{
 
 func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -58,7 +58,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  100,
                /*
@@ -77,8 +77,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.Since != nil {
-                               query.Set("since", 
collectorWithState.Since.String())
+                       if apiCollector.GetSince() != nil {
+                               query.Set("since", 
apiCollector.GetSince().String())
                        }
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
@@ -117,5 +117,5 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github/tasks/issue_collector.go 
b/backend/plugins/github/tasks/issue_collector.go
index 4e9b25d21..5558da697 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -46,7 +46,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
 
 func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -58,7 +58,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  100,
                /*
@@ -77,8 +77,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if collectorWithState.Since != nil {
-                               query.Set("since", 
collectorWithState.Since.String())
+                       if apiCollector.GetSince() != nil {
+                               query.Set("since", 
apiCollector.GetSince().String())
                        }
                        query.Set("direction", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
@@ -117,5 +117,5 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go 
b/backend/plugins/github/tasks/pr_commit_collector.go
index 487e6ac14..0604816d8 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -61,7 +61,7 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GithubTaskData)
 
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -78,10 +78,10 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
                clauses = append(
                        clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.Since),
+                       dal.Where("github_updated_at > ?", 
apiCollector.GetSince()),
                )
        }
 
@@ -95,7 +95,7 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  100,
                Input:     iterator,
@@ -141,5 +141,5 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github/tasks/pr_review_collector.go 
b/backend/plugins/github/tasks/pr_review_collector.go
index 60430c583..e4a16649d 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -53,7 +53,7 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GithubTaskData)
 
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -70,10 +70,10 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
                clauses = append(
                        clauses,
-                       dal.Where("github_updated_at > ?", 
collectorWithState.Since),
+                       dal.Where("github_updated_at > ?", 
apiCollector.GetSince()),
                )
        }
 
@@ -89,7 +89,7 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  100,
                Input:     iterator,
@@ -117,5 +117,5 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go 
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index d53371444..d6dfb296f 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -49,7 +49,7 @@ var CollectApiPrReviewCommentsMeta = plugin.SubTaskMeta{
 func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
 
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -61,7 +61,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  100,
                Header: func(reqData *helper.RequestData) (http.Header, 
errors.Error) {
@@ -74,8 +74,8 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.Since != nil {
-                               query.Set("since", 
collectorWithState.Since.String())
+                       if apiCollector.GetSince() != nil {
+                               query.Set("since", 
apiCollector.GetSince().String())
                        }
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("direction", "asc")
@@ -97,5 +97,5 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/deployment_collector.go 
b/backend/plugins/github_graphql/tasks/deployment_collector.go
index 437088629..e3a646baf 100644
--- a/backend/plugins/github_graphql/tasks/deployment_collector.go
+++ b/backend/plugins/github_graphql/tasks/deployment_collector.go
@@ -86,7 +86,7 @@ type GraphqlQueryDeploymentDeployment struct {
 // CollectDeployments will request github api via graphql and store the result 
into raw layer.
 func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*githubTasks.GithubTaskData)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: githubTasks.GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -98,7 +98,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+       err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
@@ -124,7 +124,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryDeploymentWrapper)
                        deployments := query.Repository.Deployments.Deployments
                        for _, rawL := range deployments {
-                               if collectorWithState.Since != nil && 
!collectorWithState.Since.Before(rawL.UpdatedAt) {
+                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
                                        return nil, helper.ErrFinishCollect
                                }
                        }
@@ -134,5 +134,5 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 313b7cbac..dbf1b1e95 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -82,7 +82,7 @@ var _ plugin.SubTaskEntryPoint = CollectIssues
 
 func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*githubTasks.GithubTaskData)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: githubTasks.GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -94,7 +94,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+       err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      10,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
@@ -119,7 +119,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryIssueWrapper)
                        issues := query.Repository.IssueList.Issues
                        for _, rawL := range issues {
-                               if collectorWithState.Since != nil && 
!collectorWithState.Since.Before(rawL.UpdatedAt) {
+                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
                                        return nil, helper.ErrFinishCollect
                                }
                        }
@@ -130,5 +130,5 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 48ec17045..effac75ff 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -100,7 +100,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*githubTasks.GithubTaskData)
 
-       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+       apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: githubTasks.GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -118,8 +118,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
                dal.Orderby("github_updated_at DESC"),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("github_updated_at > ?", 
*apiCollector.GetSince()))
        }
 
        cursor, err := db.Cursor(
@@ -134,7 +134,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+       err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                Input:         iterator,
                InputStep:     20,
                GraphqlClient: data.GraphqlClient,
@@ -168,5 +168,5 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 2bc887305..903ce6c5b 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -130,7 +130,7 @@ var _ plugin.SubTaskEntryPoint = CollectPrs
 func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*tasks.GithubTaskData)
        var err errors.Error
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: tasks.GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -142,7 +142,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
-       err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
+       err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      10,
                /*
@@ -170,7 +170,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        query := iQuery.(*GraphqlQueryPrWrapper)
                        prs := query.Repository.PullRequests.Prs
                        for _, rawL := range prs {
-                               if collectorWithState.Since != nil && 
!collectorWithState.Since.Before(rawL.CreatedAt) {
+                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.CreatedAt) {
                                        return nil, api.ErrFinishCollect
                                }
                        }
@@ -181,5 +181,5 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/gitlab/tasks/deployment_collector.go 
b/backend/plugins/gitlab/tasks/deployment_collector.go
index 6c4c2a74a..36ecb60c1 100644
--- a/backend/plugins/gitlab/tasks/deployment_collector.go
+++ b/backend/plugins/gitlab/tasks/deployment_collector.go
@@ -49,11 +49,11 @@ var CollectDeploymentMeta = plugin.SubTaskMeta{
 
 func CollectDeployment(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_DEPLOYMENT)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                PageSize:           100,
@@ -70,11 +70,11 @@ func CollectDeployment(taskCtx plugin.SubTaskContext) 
errors.Error {
                        } else {
                                query.Set("order_by", "created_at")
                        }
-                       if collectorWithState.Since != nil {
-                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("updated_after", 
apiCollector.GetSince().Format(time.RFC3339))
                        }
-                       if collectorWithState.Before != nil {
-                               query.Set("updated_before", 
collectorWithState.Before.Format(time.RFC3339))
+                       if apiCollector.GetUntil() != nil {
+                               query.Set("updated_before", 
apiCollector.GetUntil().Format(time.RFC3339))
                        }
                        return query, nil
                },
@@ -84,5 +84,5 @@ func CollectDeployment(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go 
b/backend/plugins/gitlab/tasks/issue_collector.go
index 92c3fbc21..9ae57fb4f 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -61,8 +61,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                */
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.Since != nil {
-                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
+                       if collectorWithState.GetSince() != nil {
+                               query.Set("updated_after", 
collectorWithState.GetSince().Format(time.RFC3339))
                        }
                        query.Set("sort", "asc")
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go 
b/backend/plugins/gitlab/tasks/mr_collector.go
index 029b19043..320d977d2 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -43,12 +43,12 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
                UrlTemplate:    "projects/{{ .Params.ProjectId 
}}/merge_requests",
@@ -59,8 +59,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        if err != nil {
                                return nil, err
                        }
-                       if collectorWithState.Since != nil {
-                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("updated_after", 
apiCollector.GetSince().Format(time.RFC3339))
                        }
                        return query, nil
                },
@@ -70,5 +70,5 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go 
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index eb4f8f3a6..bfb64f031 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -74,7 +74,7 @@ func CollectApiMergeRequestDetails(taskCtx 
plugin.SubTaskContext) errors.Error {
        return collectorWithState.Execute()
 }
 
-func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *helper.ApiCollectorStateManager) 
(*helper.DalCursorIterator, errors.Error) {
+func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, 
apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, 
errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)
        clauses := []dal.Clause{
@@ -85,8 +85,8 @@ func GetMergeRequestDetailsIterator(taskCtx 
plugin.SubTaskContext, collectorWith
                        data.Options.ProjectId, data.Options.ConnectionId, true,
                ),
        }
-       if collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*apiCollector.GetSince()))
        }
 
        // construct the input iterator
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index ceb911d68..77415506f 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -44,7 +44,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -54,7 +54,7 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                MinTickInterval:    &tickInterval,
@@ -62,8 +62,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.Since != nil {
-                               query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("updated_after", 
apiCollector.GetSince().Format(time.RFC3339))
                        }
                        query.Set("with_stats", "true")
                        query.Set("sort", "asc")
@@ -78,5 +78,5 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index 227d19e93..a4bed6b0d 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -82,7 +82,7 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) 
errors.Error {
        return collectorWithState.Execute()
 }
 
-func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector 
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)
        clauses := []dal.Clause{
@@ -93,8 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *hel
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.Since != nil && collectorWithState.IsIncremental {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*apiCollector.GetSince()))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/shared.go 
b/backend/plugins/gitlab/tasks/shared.go
index f20514961..4051c93b5 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -168,7 +168,7 @@ func CreateRawDataSubTaskArgs(taskCtx 
plugin.SubTaskContext, Table string) (*hel
        return rawDataSubTaskArgs, data
 }
 
-func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *helper.ApiCollectorStateManager) 
(*helper.DalCursorIterator, errors.Error) {
+func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector 
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)
        clauses := []dal.Clause{
@@ -179,9 +179,9 @@ func GetMergeRequestsIterator(taskCtx 
plugin.SubTaskContext, collectorWithState
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState != nil {
-               if collectorWithState.Since != nil {
-                       clauses = append(clauses, dal.Where("gitlab_updated_at 
> ?", *collectorWithState.Since))
+       if apiCollector != nil {
+               if apiCollector.GetSince() != nil {
+                       clauses = append(clauses, dal.Where("gitlab_updated_at 
> ?", *apiCollector.GetSince()))
                }
        }
        // construct the input iterator
diff --git a/backend/plugins/gitlab/tasks/trigger_job_collector.go 
b/backend/plugins/gitlab/tasks/trigger_job_collector.go
index 755eeb67a..37ceba215 100644
--- a/backend/plugins/gitlab/tasks/trigger_job_collector.go
+++ b/backend/plugins/gitlab/tasks/trigger_job_collector.go
@@ -44,7 +44,7 @@ var CollectApiTriggerJobsMeta = plugin.SubTaskMeta{
 
 func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TRIGGER_JOB_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -52,17 +52,15 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       iterator, err := GetAllPipelinesIterator(taskCtx, collectorWithState)
+       iterator, err := GetAllPipelinesIterator(taskCtx, apiCollector)
        if err != nil {
                return err
        }
-       incremental := collectorWithState.IsIncremental
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:       data.ApiClient,
                MinTickInterval: &tickInterval,
                PageSize:        100,
-               Incremental:     incremental,
                Input:           iterator,
                UrlTemplate:     "projects/{{ .Params.ProjectId }}/pipelines/{{ 
.Input.GitlabId }}/bridges",
                ResponseParser:  GetRawMessageFromResponse,
@@ -73,10 +71,10 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
-func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState 
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector 
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)
        clauses := []dal.Clause{
@@ -87,8 +85,8 @@ func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
apiCollector.GetSince()))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index 4503d8560..30501065a 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -49,7 +49,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*JenkinsTaskData)
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Params: JenkinsApiParams{
                        ConnectionId: data.Options.ConnectionId,
                        FullName:     data.Options.JobFullName,
@@ -67,8 +67,8 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -81,7 +81,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number 
}}/wfapi/describe", data.Options.JobPath, data.Options.JobName),
@@ -109,5 +109,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go 
b/backend/plugins/jira/tasks/development_panel_collector.go
index c513c5bf5..1381b5475 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -53,7 +53,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
        db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -72,8 +72,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -87,7 +87,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                Input:     iterator,
                // the URL looks like:
@@ -122,5 +122,5 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/jira/tasks/epic_collector.go 
b/backend/plugins/jira/tasks/epic_collector.go
index 25cbd3998..f7f79ddf2 100644
--- a/backend/plugins/jira/tasks/epic_collector.go
+++ b/backend/plugins/jira/tasks/epic_collector.go
@@ -59,7 +59,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -78,11 +78,11 @@ func CollectEpics(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Info("got user's timezone: %v", loc.String())
        }
        jql := "ORDER BY created ASC"
-       if collectorWithState.Since != nil {
-               jql = "and " + buildJQL(*collectorWithState.Since, loc)
+       if apiCollector.GetSince() != nil {
+               jql = "and " + buildJQL(*apiCollector.GetSince(), loc)
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
                Incremental: false,
@@ -123,7 +123,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int) 
(api.Iterator, errors.Error) {
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go 
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 9f29436c9..76c1eef38 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -53,7 +53,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        logger := taskCtx.GetLogger()
        db := taskCtx.GetDal()
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -71,8 +71,8 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
apiCollector.GetSince()))
        }
 
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
@@ -95,7 +95,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        // now, let ApiCollector takes care the rest
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
                GetTotalPages: GetTotalPagesFromResponse,
@@ -125,5 +125,5 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/jira/tasks/issue_collector.go 
b/backend/plugins/jira/tasks/issue_collector.go
index 3faaa9d51..84ac2d727 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -48,7 +48,7 @@ var CollectIssuesMeta = plugin.SubTaskMeta{
 func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*JiraTaskData)
        logger := taskCtx.GetLogger()
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                /*
                        This struct will be JSONEncoded and stored into 
database along with raw data itself, to identity minimal
@@ -77,11 +77,11 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Info("got user's timezone: %v", loc.String())
        }
        jql := "ORDER BY created ASC"
-       if collectorWithState.Since != nil {
-               jql = buildJQL(*collectorWithState.Since, loc)
+       if apiCollector.GetSince() != nil {
+               jql = buildJQL(*apiCollector.GetSince(), loc)
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  data.Options.PageSize,
                /*
@@ -143,7 +143,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 // buildJQL build jql based on timeAfter and incremental mode
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go 
b/backend/plugins/jira/tasks/issue_comment_collector.go
index aa6ca2cdd..5ef2777be 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -53,7 +53,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        logger := taskCtx.GetLogger()
        db := taskCtx.GetDal()
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -71,8 +71,8 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
apiCollector.GetSince()))
        }
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
                count, err := db.Count(clauses...)
@@ -94,7 +94,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        // now, let ApiCollector takes care the rest
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:     data.ApiClient,
                PageSize:      100,
                GetTotalPages: GetTotalPagesFromResponse,
@@ -124,5 +124,5 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go 
b/backend/plugins/jira/tasks/remotelink_collector.go
index 2072b1f28..6b44ca7e5 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -52,7 +52,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
        logger := taskCtx.GetLogger()
        logger.Info("collect remotelink")
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -70,8 +70,8 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("i.updated > ?", 
apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -85,7 +85,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
@@ -105,7 +105,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.Execute()
+       err = apiCollector.Execute()
        if err != nil {
                return err
        }
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index a22005bc3..8112a6d0a 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -45,7 +45,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
 
        logger := taskCtx.GetLogger()
 
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: JiraApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -66,8 +66,14 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(
+                       clauses,
+                       dal.Having(
+                               "i.updated > ? AND (i.updated > 
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))",
+                               apiCollector.GetSince(),
+                       ),
+               )
        } else {
                /*
                        i.updated > max(wl.issue_updated) was deleted because 
for non-incremental collection,
@@ -89,7 +95,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                Input:         iterator,
                ApiClient:     data.ApiClient,
                UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
@@ -112,5 +118,5 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go 
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index bf768e0d1..0822eeee9 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -34,12 +34,12 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_CHANGELOG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bug_changes",
@@ -49,8 +49,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created desc")
-                       if collectorWithState.Since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -60,7 +60,7 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect story changelog error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectBugChangelogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/bug_collector.go 
b/backend/plugins/tapd/tasks/bug_collector.go
index 746c1bd89..56479b139 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -34,12 +34,12 @@ func CollectBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect bugs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "bugs",
@@ -50,8 +50,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -61,7 +61,7 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
                logger.Error(err, "collect bug error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectBugMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go 
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 3164adb05..f3abeafe8 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectBugCommits
 func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -50,8 +50,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.From(&models.TapdBug{}),
                dal.Where("_tool_tapd_bugs.connection_id = ? and 
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+       if apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -62,7 +62,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
@@ -87,7 +87,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectBugCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go 
b/backend/plugins/tapd/tasks/iteration_collector.go
index 5f5512b1f..1fc7b72aa 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -36,12 +36,12 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ITERATION_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect iterations")
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                Concurrency: 3,
@@ -52,8 +52,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -69,7 +69,7 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect iteration error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectIterationMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go 
b/backend/plugins/tapd/tasks/story_bug_collector.go
index 9d5d0ed0c..315498242 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -36,7 +36,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryBugs
 func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_BUG_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -48,8 +48,8 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -59,7 +59,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: "stories/get_related_bugs",
@@ -76,7 +76,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect storyBug error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectStoryBugMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go 
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index d0597b7ef..83d195e26 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -49,8 +49,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if collectorWithState.GetSince() != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/story_collector.go 
b/backend/plugins/tapd/tasks/story_collector.go
index 095c72b3d..f030abbef 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -34,12 +34,12 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect stories")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "stories",
@@ -50,8 +50,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -61,7 +61,7 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect story error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectStoryMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go 
b/backend/plugins/tapd/tasks/story_commit_collector.go
index 708c71010..21044f3be 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryCommits
 func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -49,8 +49,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -60,7 +60,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
@@ -85,7 +85,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectStoryCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go 
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 7aba00c2a..66aa60b9e 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -32,13 +32,13 @@ var _ plugin.SubTaskEntryPoint = CollectTaskChangelogs
 
 func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_CHANGELOG_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect taskChangelogs")
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "task_changes",
@@ -48,8 +48,8 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("created", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -59,7 +59,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect task changelog error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectTaskChangelogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/task_collector.go 
b/backend/plugins/tapd/tasks/task_collector.go
index 2d5bfdba3..9b5beb3a4 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -34,7 +34,7 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error 
{
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect tasks")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -51,8 +51,8 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error 
{
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go 
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 0ef938997..56c374d3a 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectTaskCommits
 func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -49,8 +49,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.From(&models.TapdTask{}),
                dal.Where("_tool_tapd_tasks.connection_id = ? and 
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.Since))
+       if apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -60,7 +60,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
@@ -85,7 +85,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectTaskCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go 
b/backend/plugins/tapd/tasks/worklog_collector.go
index b091f9e44..c651a2038 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -34,12 +34,12 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_WORKLOG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect worklogs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+       apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+       err = apiCollector.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    int(data.Options.PageSize),
                UrlTemplate: "timesheets",
@@ -49,8 +49,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if collectorWithState.Since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if apiCollector.GetSince() != nil {
+                               query.Set("modified", fmt.Sprintf(">%s", 
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -60,7 +60,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect worklog error")
                return err
        }
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 var CollectWorklogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go 
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index c1ee2f74e..bf2a44116 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -52,7 +52,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        data := taskCtx.GetData().(*ZentaoTaskData)
 
        // state manager
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_BUG_COMMITS_TABLE,
@@ -70,8 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -82,7 +82,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
        // collect bug commits
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx:     taskCtx,
                        Options: data.Options,
@@ -111,7 +111,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 type SimpleZentaoBug struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go 
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 489ad2945..85404a39f 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -47,7 +47,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        data := taskCtx.GetData().(*ZentaoTaskData)
 
        // state manager
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_STORY_COMMITS_TABLE,
@@ -66,8 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`_tool_zentao_project_stories.project_id = ? and
                        _tool_zentao_project_stories.connection_id = ?`, 
data.Options.ProjectId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -80,7 +80,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        // collect story commits
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx:     taskCtx,
                        Options: data.Options,
@@ -109,7 +109,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }
 
 type inputWithLastEditedDate struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go 
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 5a3204302..219a500fd 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -46,7 +46,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        data := taskCtx.GetData().(*ZentaoTaskData)
 
        // state manager
-       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+       apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_TASK_COMMITS_TABLE,
@@ -64,8 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
-               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
+       if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+               clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", apiCollector.GetSince()))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -78,7 +78,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        }
 
        // collect task commits
-       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+       err = apiCollector.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx:     taskCtx,
                        Options: data.Options,
@@ -107,5 +107,5 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       return apiCollector.Execute()
 }


Reply via email to